「[[マイクロソフト系技術情報 Wiki>http://techinfoofmicrosofttech.osscons.jp/]]」は、「[[Open棟梁Project>https://github.com/OpenTouryoProject/]]」,「[[OSSコンソーシアム .NET開発基盤部会>https://www.osscons.jp/dotNetDevelopmentInfrastructure/]]」によって運営されています。
-[[戻る>IoT関連の通信プロトコル]]
--[[Mosquitto]]
--RabbitMQ
* 目次 [#qd06bf62]
#contents
*概要 [#j1decbb5]
-AMQPブローカーのOSS
- MQTT プロトコルサポートのプラグインもある。
*詳細 [#b0ec823e]
**Windows [#c5bcbed7]
***インストール [#bce7313a]
-Erlang
--インストール(既定値)
---Erlang Programming Language~
https://www.erlang.org/downloads
--環境変数設定~
不要だった。
-RabbitMQ
--インストール(既定値)
---Installing on Windows — RabbitMQ~
https://www.rabbitmq.com/install-windows.html
***ブローカー起動 [#f90f0803]
-サービスとしてインストールされる。
-起動:メニューから[RabbitMQ Service - start]を選択
-停止:メニューから[RabbitMQ Service - stop]を選択
***送受信 [#uabecfb4]
-[[rabbitmqadmin>#k147feaf]]で動作確認。
-[[RabbitMQ.Client>#b2261bad]]で動作確認。
**その他の環境 [#v35594f3]
***Linux [#f9bc4ea2]
***Raspberry Pi [#o2016d76]
***Docker on Windows [#nb372206]
***Docker on Raspberry Pi [#w472f821]
***SaaS / PaaS [#t3f0cf64]
-Azure:[[Azure Service Bus>Azureのメッセージング・サービス#a75511f3]](ブリッジ機能)
-Azure:Amazon MQ for RabbitMQ
-GCP:Cloud Pub/Sub?
**管理ツール [#ba5d0b37]
***管理画面 [#y14216dd]
-設定
--メニューから[RabbitMQ Command Prompt (sbin dir)]を選択
--以下の2つのコマンドを実行する。
---
 >rabbitmq-plugins enable rabbitmq_management
 ...
---
 >rabbitmq-plugins enable rabbitmq_tracing
 ...
-アクセス~
--HTTPSではなくHTTP~
http://127.0.0.1:15672/
--アカウントは~
guest/guest
-参考
--RabbitMQのインストールと管理画面の有効化(MacOSX/Linux/Windows) - Qiita~
https://qiita.com/AKB428/items/a2662fbb624ce7659025
***rabbitmqctl [#z7c314c7]
-標準の管理ツールでvhost作成・ユーザ作成・権限付与などの管理作業を行う。
-Windowsの場合~
以下から使用する。
--格納場所は以下のディレクトリ。
 ...RabbitMQ Server\rabbitmq_server-x.x.x\sbin
--.batファイルとして提供されている。
 rabbitmqctl.bat
--メニューから[RabbitMQ Command Prompt (sbin dir)]を選択し、.batは省略可能して実行可能。
 >rabbitmqctl(.bat) status
-参考
--RabbitMQ管理コマンド(rabbitmqctl)使い方 - Qiita~
https://qiita.com/tamikura@github/items/5293cda4c0026b2d7022
***rabbitmq-plugins [#x450bd99]
[[前述>#y14216dd]]のrabbitmq-pluginsでプラグインを制御できる。
***rabbitmqadmin [#k147feaf]
-[[管理画面>#y14216dd]]に付属するツール(REST APIのクライアント)で、~
[[Exchange や Queue の作成、Messageの送受信>#m2c0116f]]などを行う。
-pythonが必要で、rabbitmqadminファイルを[[管理画面>#y14216dd]]からダウンロードして、以下のように使用する。
--Linuxルの場合~
rabbitmqadminファイルを/usr/local/binにコピーし、
 $ rabbitmqadmin ...サブ・コマンド...
--Windowsの場合
rabbitmqadminファイルをコピーし(sbin dirが良いかと)、
 >python.exe rabbitmqadmin ...サブ・コマンド...
-Messageの送受信
--受信
---例
 rabbitmqadmin -u admin -p admin -V my-vhost get queue=my-q ackmode=ack_requeue_true
---初期値
 rabbitmqadmin get queue=my-q ackmode=ack_requeue_true
--送信
---例
 rabbitmqadmin -u admin -p admin -V my-vhost publish routing_key=my-key exchange=my-exchange payload=test
---初期値
 rabbitmqadmin publish routing_key=my-key exchange=my-exchange payload=test
-参考
--Management Command Line Tool — RabbitMQ~
https://www.rabbitmq.com/management-cli.html
--windows RabbitMQ doesnt have rabbitmqadmin in sbin folder - Stack Overflow~
https://stackoverflow.com/questions/43547461/windows-rabbitmq-doesnt-have-rabbitmqadmin-in-sbin-folder
***... [#p5e12a9d]
***参考 [#o542b41b]
-Command Line Tools — RabbitMQ~
https://www.rabbitmq.com/cli.html
**設定ファイル [#te4b8341]
***新・旧の設定ファイル [#r68d2271]
-場所~
インストール後、設定ファイルは存在しないので、必要なら、作成する必要がある。
--Windows:
 %APPDATA%\RabbitMQ\rabbitmq.conf
 C:\Users\xxxxx\AppData\Roaming\RabbitMQ\rabbitmq.conf
--Linux:
 /etc/rabbitmq/rabbitmq.config
-新ファイル
--rabbitmq.conf
---新式の書き方。
---sysctl 方式で記述。
--advanced.config
---旧式の書き方。
---新スタイルの設定形式では表現できない限られた設定項目。
-旧ファイル
--rabbitmq.config
---旧式の書き方。
---Erlang の標準設定ファイル書式で記述。
-参考
--rabbitmq設定ファイルはどこにありますか? - ja.pays-tarusate.org~
https://ja.pays-tarusate.org/377570-where-is-rabbitmq-config-file-IOKVEA
***インポート・エクスポート [#o65c9989]
-エクスポート
 rabbitmqadmin export c:\export.json
-インポート~
rabbitmq.confに以下を追記する。
 management.load_definitions = c:\import.json
***管理ツール [#jebe6709]
前述の[[管理ツール>#ba5d0b37]]を使用して設定を行う。
**認証 [#n95ae828]
バーチャルホスト、キュー、エクスチェンジなどのリソースへのアクセスを承認する
***パスワード [#b63d948d]
-デフォルト・ユーザー
--ユーザー名「guest」パスワード「guest」。
--バーチャルホスト「/」へのフルアクセスを許可される。
--規定ではホスト・ローカル接続にしか使用できない。~
loopback_usersの設定をnoneにすることで、~
リモートホストからの接続を許可することが可能。
-ユーザの管理
--CLI
---追加
 rabbitmqctl add_user 'username' 'password'
---更新
 rabbitmqctl change_password
---削除
 rabbitmqctl delete_user 'username'
---一覧
 rabbitmqctl list_users
---アクセス許可設定~
[[Exchange>#k283cf9f]]以下([[Queue>#g034aa64]]など)のアクセス許可を設定できる。~
(Topicの認可というのものあるらしい(既定では無効になっている))~
・追加
 # First ".*" for configure permission on every entity
 # Second ".*" for write permission on every entity
 # Third ".*" for read permission on every entity
 rabbitmqctl set_permissions -p "custom-vhost" "username" ".*" ".*" ".*"
・削除
 rabbitmqctl clear_permissions -p "custom-vhost" "username"
・一括
 for v in $(rabbitmqctl list_vhosts --silent); do rabbitmqctl set_permissions -p $v "a-user" ".*" ".*" ".*"; done
--その他
---ノードブート時の定義のインポート
---別の認証バックエンド(LDAP、HTTP、AMQPなど)
-参考
--RabbitMQ
---Authentication, Authorisation, Access Control~
https://www.rabbitmq.com/access-control.html
---Credentials and Passwords~
https://www.rabbitmq.com/passwords.html
***証明書 [#s568939e]
-サーバー認証とクライアント認証の両方が可能。
-サーバー認証~
rabbitmq-auth-mechanism-sslを有効にして、~
クライアントがEXTERNALメカニズムを使うように設定
-クライアント認証
--ユーザー名はCNと一致する。
--クライアントが提供するパスワードはすべて無視される。
-参考
--RabbitMQ
---TLS Support~
https://www.rabbitmq.com/ssl.html
---Troubleshooting TLS-enabled Connections~
https://www.rabbitmq.com/troubleshooting-ssl.html
--RabbitMQでTLS通信を有効にする - OutSystems~
https://success.outsystems.com/ja-jp/Support/Enterprise_Customers/Maintenance_and_Operations/Cache_Invalidation_in_OutSystems_11/Enable_TLS_communication_in_RabbitMQ
--Jeffrey A Becker's Blog - Using SSL client certificates for authentication with RabbitMQ~
https://weblogs.asp.net/jeffreyabecker/Using-SSL-client-certificates-for-authentication-with-RabbitMQ
--RabbitMQをSSL対応させてみる(with Graylogでログを受け取らせてみる) | 俺的備忘録 〜なんかいろいろ〜~
https://orebibou.com/ja/home/201702/20170212_001/
**ブローカーモデル [#m2c0116f]
AMQP 1.0 で削除されたので、コレは、純粋に、RabbitMQの話。
***Producer [#a530013b]
送信する人
***Consumer [#gdf3a04b]
受信する人
***Message [#kf30e258]
送受信対象のメッセージ
***Queue [#g034aa64]
キュー
***Exchange [#k283cf9f]
Queueへの配送ルール
-direct~
routing keyとbinding keyが完全したMessageだけQueueに配送。
-fanout~
binding keyを無視して全てのQueueにMessageを配送(ブロードキャスト)
-topic~
routing keyとbinding keyで、部分一致したMessageをQueueに配送。
***Vhosts (Virtual Hosts) [#lac43cb7]
-固有の名前を持つ個々のコンテナ(論理的グループ)
-コンテナ中には、以下リソースがある。
--Connection
--[[Exchange>#k283cf9f]]
--[[Queue>#g034aa64]]
--Binding
--ユーザー権限
--およびその他のシステムリソース
**送受信(C#) [#d03162e8]
***RabbitMQ.Client [#b2261bad]
様々な言語向けにリリースされている。
-以下のコードは[[.NET Core]]でも大方動く。
--一箇所、変更した。
---ea.Body
>↓ ↓ ↓
---ea.Body.ToArray()
--Qiitaのサンプルをベースに弄った。
 using Newtonsoft.Json;
 using RabbitMQ;
 using RabbitMQ.Client;
 using RabbitMQ.Client.Events;
 using System;
 using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
 using System.Net.Security;
 using System.Security.Cryptography.X509Certificates;
 
 namespace ConsoleApp1
 {
     class Program
     {
         static void Main(string[] args)
         {
             // ホスト名
             var hostname = "localhost";
 
             // Taskキャンセルトークン
             var tokenSource = new CancellationTokenSource();
 
             Console.WriteLine($"start .Net RabbitMQ Example. Ctl+C to exit");
 
             X509Certificate2 cli = new X509Certificate2(@"C:\certs\client.pfx", "xxxxx");
 
             // ファクトリ生成
             var factory = new ConnectionFactory()
             {
                 HostName = hostname,
                 //UserName = "hogecli",
                 //Password = "hogecli",
                 AuthMechanisms = new IAuthMechanismFactory[] { new ExternalMechanismFactory() },
                 Ssl = new SslOption
                 {
                     Enabled = true,
                     ServerName = hostname,
                     AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch 
                                              | SslPolicyErrors.RemoteCertificateChainErrors,
                     Certs = new X509CertificateCollection(new X509Certificate[] { cli })
                 }
             };
 
             // パブリッシャータスク
             var pTask = Task.Run(() => new Action<ConnectionFactory, CancellationToken>(async (f, cancel) => {
                 // コネクション&チャンネル生成
                 using (var conn = f.CreateConnection())
                 using (var channel = conn.CreateModel())
                 {
                     // Exchange生成
                     channel.ExchangeDeclare("test", "fanout", false, true);
 
                     while (true)
                     {
                         // キャンセル待ち
                         if (cancel.IsCancellationRequested)
                         {
                             break;
                         }
 
                         var msg = new SendMessage()
                         {
                             Message = "Hello",
                             Timestamp = DateTime.UtcNow.ToBinary()
                         };
                         var body = JsonConvert.SerializeObject(msg);
 
                         // Publish!!
                         try
                         {
                             channel.BasicPublish("test", "", null, Encoding.UTF8.GetBytes(body));
                             Console.WriteLine($"success send. message: {msg.Message}, timestamp: {msg.Timestamp}");
                         }
                         catch (Exception ex)
                         {
                             Console.WriteLine($"failer send. reason: {ex.Message}");
                         }
 
                         await Task.Delay(10000);
                     }
                 }
             })(factory, tokenSource.Token), tokenSource.Token);
 
             // コンシューマータスク
             var cTask = Task.Run(() => new Action<ConnectionFactory, CancellationToken>((f, cancel) => {
                 // コネクション&チャンネル生成
                 using (var conn = f.CreateConnection())
                 using (var channel = conn.CreateModel())
                 {
                     // Exchange生成
                     channel.ExchangeDeclare("test", "fanout", false, true);
 
                     // Queue生成
                     var queueName = channel.QueueDeclare().QueueName;
 
                     // Bind Queue
                     channel.QueueBind(queueName, "test", "");
 
                     // コンシューマー生成
                     var consumer = new EventingBasicConsumer(channel);
 
                     // 受信イベント定義
                     consumer.Received += (_, ea) =>
                     {
                         var msg = JsonConvert.DeserializeObject<ConsumedMessage>(Encoding.UTF8.GetString(ea.Body.ToArray()));
                         Console.WriteLine($"success consumed. message: {msg.Message}, timestamp: {msg.Timestamp}");
                     };
 
                     // コンシューマー登録
                     channel.BasicConsume(queueName, true, consumer);
 
                     while (true)
                     {
                         // キャンセル待ち   
                         if (cancel.IsCancellationRequested)
                         {
                             break;
                         }
                     }
                 }
             })(factory, tokenSource.Token), tokenSource.Token);
 
             // Ctl+C待機
             Console.CancelKeyPress += (_, e) =>
             {
                 e.Cancel = true;
                 tokenSource.Cancel(); // Taskキャンセル
             };
 
             Task.WaitAll(pTask, cTask);
 
             Console.WriteLine("stop .Net RabbitMQ Example. press any key to close.");
             Console.ReadKey();
         }
     }
 
     public class SendMessage
     {
         public string Message { get; set; }
         public long Timestamp { get; set; }
     }
 
     public class ConsumedMessage
     {
         public string Message { get; set; }
         public long Timestamp { get; set; }
     }
 }
-参考
--RabbitMQでPublisherとConsumer(C#編)~
https://qiita.com/lightstaff/items/1a8a1e602d486571cb3a
--RabbitMQ tutorial - "Hello World!" — RabbitMQ~
https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
--より洗練された実装~
https://github.com/OpenTouryoProject/DxCommon/tree/develop/Edge/RabbitMQ/Client/CS
***EasyNetQ [#df0bbf55]
-RabbitMQのための優れた.NET API
-初期の開発は、15belowがスポンサーとなって行われた。
***AMQPクライアント [#he029c15]
RabbitMQはサポートしない?
-AMQPNetLite
-Microsoft.Azure.Amqp
-...
*チュートリアル [#x8a94036]
**Windows [#ua9b4728]
***証明書の準備 [#l31f2bae]
[[Mosquittoのチュートリアル>Mosquitto#p4f5a191]]と同じ。
***SSLの設定 [#lc58e121]
-サーバー証明書を利用
--advanced.config
 [
   {rabbit,  [ 
     {ssl_listeners, [5671]},
     {ssl_options, [{cacertfile,"c:/certs/ca.crt"},
                   {certfile,"c:/certs/server.crt"},
                   {keyfile,"c:/certs/server.key"},
                   {verify,verify_none},
                   {fail_if_no_peer_cert,false}]}
   ]}
 ].
--サービスの再起動~
[[管理画面>#y14216dd]]でSSLポートを確認する。
 Listening ports
 amqp/ssl	0.0.0.0	5671
 amqp/ssl	::	5671
-パスワード認証~
[[管理画面>#y14216dd]]でユーザーを作成する。
-クライアント証明書を利用
--プラグインを有効化
 rabbitmq-plugins enable rabbitmq_auth_mechanism_ssl
--CNと一致したユーザーを作成する。
--advanced.config
 [
   {rabbit,  [ 
     {ssl_listeners, [5671]},
     {auth_mechanisms, ['EXTERNAL', 'PLAIN']},
     {ssl_cert_login_from, common_name},
     {ssl_options, [{cacertfile,"c:/certs/ca.crt"},
                   {certfile,"c:/certs/server.crt"},
                   {keyfile,"c:/certs/server.key"},
                   {verify,verify_peer},
                   {fail_if_no_peer_cert,true}]}
   ]}
 ].
--サービスの再起動~
[[管理画面>#y14216dd]]でSSLポートを確認する。
 Listening ports
 amqp/ssl	0.0.0.0	5671
 amqp/ssl	::	5671
***プログラムからアクセス [#bd32bdbe]
-認証局の証明書(ca.crt)は、[[証明書ストア>証明書#dccdb7e5]]の~
信頼されたルート証明機関に入れておく必要がある。
-サーバー証明書を利用
--以下を書き換える。
 using System.Net.Security; // 追加
 
 ...
 
 // ファクトリ生成
 var factory = new ConnectionFactory()
 {
   HostName = hostname,
   Ssl = new SslOption // 追加
   {
     Enabled = true,
     ServerName = hostname,
     AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch 
                              | SslPolicyErrors.RemoteCertificateChainErrors,
   }
 };
--デバッグ実行で動作確認~
動作した。
-パスワード認証を利用
--以下を書き換える。
 var factory = new ConnectionFactory()
 {
   HostName = hostname,
 
   UserName = "xxxxx", // 追加
   Password = "xxxxx", // 追加
--デバッグ実行で動作確認~
動作した。
-クライアント証明書を利用
--以下を書き換える。
 using System.Security.Cryptography.X509Certificates; // 追加
 
 ...
 
 X509Certificate2 cli = new X509Certificate2(@"C:\certs\client.pfx", "xxxxx"); // 追加
 
 // ファクトリ生成
 var factory = new ConnectionFactory()
 {
   HostName = hostname,
   //UserName = "xxxxx", // 削除
   //Password = "xxxxx", // 削除
   AuthMechanisms = new IAuthMechanismFactory[]{ new ExternalMechanismFactory()}, // 追加
   Ssl = new SslOption
   {
     Enabled = true,
     ServerName = hostname,
     AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch 
                              | SslPolicyErrors.RemoteCertificateChainErrors,
     Certs = new X509CertificateCollection(new X509Certificate[] { cli }) // 追加
   }
 };
--デバッグ実行で動作確認~
動作した(ssl_cert_login_fromを書く位置に注意!)。
**Linux [#jcd1f6f4]
*参考 [#y172abcc]
-RabbitMQ に入門してピョンピョンしてみた - kakakakakku blog~
https://kakakakakku.hatenablog.com/entry/2015/08/26/000853
**SIOS Tech. Lab [#g4a50d67]
-RabbitMQによる非同期処理~
https://tech-lab.sios.jp/archives/7902
-WindowsでRabbitMQ~
https://tech-lab.sios.jp/archives/7920
-Rest APIでRabbitMQ~
https://tech-lab.sios.jp/archives/7941
-.NETでRabbitMQ~
https://tech-lab.sios.jp/archives/8018
**Microsoft Docs [#dd5184c5]
-開発環境またはテスト環境の RabbitMQ でイベント バスを実装する~
https://docs.microsoft.com/ja-jp/dotnet/architecture/microservices/multi-container-microservice-net-applications/rabbitmq-event-bus-development-test-environment
-Azure Functions における Azure RabbitMQ バインド~
https://docs.microsoft.com/ja-jp/azure/azure-functions/functions-bindings-rabbitmq
-Azure Functions における RabbitMQ の出力バインド~
https://docs.microsoft.com/ja-jp/azure/azure-functions/functions-bindings-rabbitmq-output
**Qiita [#r64ad8af]
-RabbitMQをインストールする(Windows/Mac/Docker)~
https://qiita.com/suke_masa/items/5d583462727660663fde
-RabbitMQ でメッセージの送受信を行うまでの環境設定についてのメモ~
https://qiita.com/ptiringo/items/c554fa66f0d985394fed
----
Tags: [[:通信技術]], [[:.NET開発]], [[:IoT]]