「[[マイクロソフト系技術情報 Wiki>http://techinfoofmicrosofttech.osscons.jp/]]」は、「[[Open棟梁Project>https://github.com/OpenTouryoProject/]]」,「[[OSSコンソーシアム .NET開発基盤部会>https://www.osscons.jp/dotNetDevelopmentInfrastructure/]]」によって運営されています。

-[[戻る>IoT関連の通信プロトコル]]
--[[Mosquitto]]
--RabbitMQ

* 目次 [#qd06bf62]
#contents

*概要 [#j1decbb5]
-AMQPブローカーのOSS
- MQTT プロトコルサポートのプラグインもある。

*詳細 [#b0ec823e]

**Windows [#c5bcbed7]
***インストール [#bce7313a]

-Erlang

--インストール(既定値)

---Erlang Programming Language~
https://www.erlang.org/downloads

--環境変数設定~
不要だった。

-RabbitMQ

--インストール(既定値)
---Installing on Windows — RabbitMQ~
https://www.rabbitmq.com/install-windows.html

***ブローカー起動 [#f90f0803]
-サービスとしてインストールされる。
-起動:メニューから[RabbitMQ Service - start]を選択
-停止:メニューから[RabbitMQ Service - stop]を選択

***送受信 [#uabecfb4]
-[[rabbitmqadmin>#k147feaf]]で動作確認。
-[[RabbitMQ.Client>#b2261bad]]で動作確認。

**その他の環境 [#v35594f3]

***Linux [#f9bc4ea2]

***Raspberry Pi [#o2016d76]

***Docker on Windows [#nb372206]

***Docker on Raspberry Pi [#w472f821]

***SaaS / PaaS [#t3f0cf64]
-Azure:[[Azure Service Bus>Azureのメッセージング・サービス#a75511f3]](ブリッジ機能)
-Azure:Amazon MQ for RabbitMQ
-GCP:Cloud Pub/Sub?

**管理ツール [#ba5d0b37]

***管理画面 [#y14216dd]
-設定

--メニューから[RabbitMQ Command Prompt (sbin dir)]を選択

--以下の2つのコマンドを実行する。

---
 >rabbitmq-plugins enable rabbitmq_management
 ...

---
 >rabbitmq-plugins enable rabbitmq_tracing
 ...

-アクセス~

--HTTPSではなくHTTP~
http://127.0.0.1:15672/

--アカウントは~
guest/guest

-参考
--RabbitMQのインストールと管理画面の有効化(MacOSX/Linux/Windows) - Qiita~
https://qiita.com/AKB428/items/a2662fbb624ce7659025

***rabbitmqctl [#z7c314c7]
-標準の管理ツールでvhost作成・ユーザ作成・権限付与などの管理作業を行う。

-Windowsの場合~
以下から使用する。

--格納場所は以下のディレクトリ。
 ...RabbitMQ Server\rabbitmq_server-x.x.x\sbin

--.batファイルとして提供されている。
 rabbitmqctl.bat

--メニューから[RabbitMQ Command Prompt (sbin dir)]を選択し、.batは省略可能して実行可能。
 >rabbitmqctl(.bat) status

-参考
--RabbitMQ管理コマンド(rabbitmqctl)使い方 - Qiita~
https://qiita.com/tamikura@github/items/5293cda4c0026b2d7022

***rabbitmq-plugins [#x450bd99]
[[前述>#y14216dd]]のrabbitmq-pluginsでプラグインを制御できる。

***rabbitmqadmin [#k147feaf]
-[[管理画面>#y14216dd]]に付属するツール(REST APIのクライアント)で、~
[[Exchange や Queue の作成、Messageの送受信>#m2c0116f]]などを行う。

-pythonが必要で、rabbitmqadminファイルを[[管理画面>#y14216dd]]からダウンロードして、以下のように使用する。

--Linuxルの場合~
rabbitmqadminファイルを/usr/local/binにコピーし、
 $ rabbitmqadmin ...サブ・コマンド...

--Windowsの場合
rabbitmqadminファイルをコピーし(sbin dirが良いかと)、
 >python.exe rabbitmqadmin ...サブ・コマンド...

-Messageの送受信

--受信
---例
 rabbitmqadmin -u admin -p admin -V my-vhost get queue=my-q ackmode=ack_requeue_true
---初期値
 rabbitmqadmin get queue=my-q ackmode=ack_requeue_true

--送信
---例
 rabbitmqadmin -u admin -p admin -V my-vhost publish routing_key=my-key exchange=my-exchange payload=test
---初期値
 rabbitmqadmin publish routing_key=my-key exchange=my-exchange payload=test

-参考
--Management Command Line Tool — RabbitMQ~
https://www.rabbitmq.com/management-cli.html
--windows RabbitMQ doesnt have rabbitmqadmin in sbin folder - Stack Overflow~
https://stackoverflow.com/questions/43547461/windows-rabbitmq-doesnt-have-rabbitmqadmin-in-sbin-folder

***... [#p5e12a9d]

***参考 [#o542b41b]
-Command Line Tools — RabbitMQ~
https://www.rabbitmq.com/cli.html

**設定ファイル [#te4b8341]

***新・旧の設定ファイル [#r68d2271]
-場所~
インストール後、設定ファイルは存在しないので、必要なら、作成する必要がある。

--Windows:
 %APPDATA%\RabbitMQ\rabbitmq.conf
 C:\Users\xxxxx\AppData\Roaming\RabbitMQ\rabbitmq.conf

--Linux:
 /etc/rabbitmq/rabbitmq.config


-新ファイル
--rabbitmq.conf
---新式の書き方。
---sysctl 方式で記述。

--advanced.config
---旧式の書き方。
---新スタイルの設定形式では表現できない限られた設定項目。

-旧ファイル
--rabbitmq.config
---旧式の書き方。
---Erlang の標準設定ファイル書式で記述。

-参考
--rabbitmq設定ファイルはどこにありますか? - ja.pays-tarusate.org~
https://ja.pays-tarusate.org/377570-where-is-rabbitmq-config-file-IOKVEA

***インポート・エクスポート [#o65c9989]

-エクスポート
 rabbitmqadmin export c:\export.json

-インポート~
rabbitmq.confに以下を追記する。
 management.load_definitions = c:\import.json

***管理ツール [#jebe6709]
前述の[[管理ツール>#ba5d0b37]]を使用して設定を行う。

**認証 [#n95ae828]
バーチャルホスト、キュー、エクスチェンジなどのリソースへのアクセスを承認する

***パスワード [#b63d948d]

-デフォルト・ユーザー
--ユーザー名「guest」パスワード「guest」。
--バーチャルホスト「/」へのフルアクセスを許可される。
--規定ではホスト・ローカル接続にしか使用できない。~
loopback_usersの設定をnoneにすることで、~
リモートホストからの接続を許可することが可能。

-ユーザの管理

--CLI
---追加
 rabbitmqctl add_user 'username' 'password'
---更新
 rabbitmqctl change_password
---削除
 rabbitmqctl delete_user 'username'
---一覧
 rabbitmqctl list_users

---アクセス許可設定~
[[Exchange>#k283cf9f]]以下([[Queue>#g034aa64]]など)のアクセス許可を設定できる。~
(Topicの認可というのものあるらしい(既定では無効になっている))~
・追加
 # First ".*" for configure permission on every entity
 # Second ".*" for write permission on every entity
 # Third ".*" for read permission on every entity
 rabbitmqctl set_permissions -p "custom-vhost" "username" ".*" ".*" ".*"
・削除
 rabbitmqctl clear_permissions -p "custom-vhost" "username"
・一括
 for v in $(rabbitmqctl list_vhosts --silent); do rabbitmqctl set_permissions -p $v "a-user" ".*" ".*" ".*"; done

--その他
---ノードブート時の定義のインポート
---別の認証バックエンド(LDAP、HTTP、AMQPなど)

-参考
--RabbitMQ
---Authentication, Authorisation, Access Control~
https://www.rabbitmq.com/access-control.html
---Credentials and Passwords~
https://www.rabbitmq.com/passwords.html

***証明書 [#s568939e]
-サーバー認証とクライアント認証の両方が可能。

-サーバー認証~
rabbitmq-auth-mechanism-sslを有効にして、~
クライアントがEXTERNALメカニズムを使うように設定

-クライアント認証
--ユーザー名はCNと一致する。
--クライアントが提供するパスワードはすべて無視される。

-参考

--RabbitMQ
---TLS Support~
https://www.rabbitmq.com/ssl.html
---Troubleshooting TLS-enabled Connections~
https://www.rabbitmq.com/troubleshooting-ssl.html

--RabbitMQでTLS通信を有効にする - OutSystems~
https://success.outsystems.com/ja-jp/Support/Enterprise_Customers/Maintenance_and_Operations/Cache_Invalidation_in_OutSystems_11/Enable_TLS_communication_in_RabbitMQ

--Jeffrey A Becker's Blog - Using SSL client certificates for authentication with RabbitMQ~
https://weblogs.asp.net/jeffreyabecker/Using-SSL-client-certificates-for-authentication-with-RabbitMQ

--RabbitMQをSSL対応させてみる(with Graylogでログを受け取らせてみる) | 俺的備忘録 〜なんかいろいろ〜~
https://orebibou.com/ja/home/201702/20170212_001/

**ブローカーモデル [#m2c0116f]
AMQP 1.0 で削除されたので、コレは、純粋に、RabbitMQの話。

***Producer [#a530013b]
送信する人

***Consumer [#gdf3a04b]
受信する人

***Message [#kf30e258]
送受信対象のメッセージ

***Queue [#g034aa64]
キュー

***Exchange [#k283cf9f]
Queueへの配送ルール

-direct~
routing keyとbinding keyが完全したMessageだけQueueに配送。

-fanout~
binding keyを無視して全てのQueueにMessageを配送(ブロードキャスト)

-topic~
routing keyとbinding keyで、部分一致したMessageをQueueに配送。

***Vhosts (Virtual Hosts) [#lac43cb7]
-固有の名前を持つ個々のコンテナ(論理的グループ)
-コンテナ中には、以下リソースがある。
--Connection
--[[Exchange>#k283cf9f]]
--[[Queue>#g034aa64]]
--Binding
--ユーザー権限
--およびその他のシステムリソース

**送受信(C#) [#d03162e8]

***RabbitMQ.Client [#b2261bad]
様々な言語向けにリリースされている。

-以下のコードは[[.NET Core]]でも大方動く。

--一箇所、変更した。

---ea.Body

>↓ ↓ ↓

---ea.Body.ToArray()

--Qiitaのサンプルをベースに弄った。
 using Newtonsoft.Json;
 using RabbitMQ;
 using RabbitMQ.Client;
 using RabbitMQ.Client.Events;
 using System;
 using System.Text;
 using System.Threading;
 using System.Threading.Tasks;
 using System.Net.Security;
 using System.Security.Cryptography.X509Certificates;
 
 namespace ConsoleApp1
 {
     class Program
     {
         static void Main(string[] args)
         {
             // ホスト名
             var hostname = "localhost";
 
             // Taskキャンセルトークン
             var tokenSource = new CancellationTokenSource();
 
             Console.WriteLine($"start .Net RabbitMQ Example. Ctl+C to exit");
 
             X509Certificate2 cli = new X509Certificate2(@"C:\certs\client.pfx", "xxxxx");
 
             // ファクトリ生成
             var factory = new ConnectionFactory()
             {
                 HostName = hostname,
                 //UserName = "hogecli",
                 //Password = "hogecli",
                 AuthMechanisms = new IAuthMechanismFactory[] { new ExternalMechanismFactory() },
                 Ssl = new SslOption
                 {
                     Enabled = true,
                     ServerName = hostname,
                     AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch 
                                              | SslPolicyErrors.RemoteCertificateChainErrors,
                     Certs = new X509CertificateCollection(new X509Certificate[] { cli })
                 }
             };
 
             // パブリッシャータスク
             var pTask = Task.Run(() => new Action<ConnectionFactory, CancellationToken>(async (f, cancel) => {
                 // コネクション&チャンネル生成
                 using (var conn = f.CreateConnection())
                 using (var channel = conn.CreateModel())
                 {
                     // Exchange生成
                     channel.ExchangeDeclare("test", "fanout", false, true);
 
                     while (true)
                     {
                         // キャンセル待ち
                         if (cancel.IsCancellationRequested)
                         {
                             break;
                         }
 
                         var msg = new SendMessage()
                         {
                             Message = "Hello",
                             Timestamp = DateTime.UtcNow.ToBinary()
                         };
                         var body = JsonConvert.SerializeObject(msg);
 
                         // Publish!!
                         try
                         {
                             channel.BasicPublish("test", "", null, Encoding.UTF8.GetBytes(body));
                             Console.WriteLine($"success send. message: {msg.Message}, timestamp: {msg.Timestamp}");
                         }
                         catch (Exception ex)
                         {
                             Console.WriteLine($"failer send. reason: {ex.Message}");
                         }
 
                         await Task.Delay(10000);
                     }
                 }
             })(factory, tokenSource.Token), tokenSource.Token);
 
             // コンシューマータスク
             var cTask = Task.Run(() => new Action<ConnectionFactory, CancellationToken>((f, cancel) => {
                 // コネクション&チャンネル生成
                 using (var conn = f.CreateConnection())
                 using (var channel = conn.CreateModel())
                 {
                     // Exchange生成
                     channel.ExchangeDeclare("test", "fanout", false, true);
 
                     // Queue生成
                     var queueName = channel.QueueDeclare().QueueName;
 
                     // Bind Queue
                     channel.QueueBind(queueName, "test", "");
 
                     // コンシューマー生成
                     var consumer = new EventingBasicConsumer(channel);
 
                     // 受信イベント定義
                     consumer.Received += (_, ea) =>
                     {
                         var msg = JsonConvert.DeserializeObject<ConsumedMessage>(Encoding.UTF8.GetString(ea.Body.ToArray()));
                         Console.WriteLine($"success consumed. message: {msg.Message}, timestamp: {msg.Timestamp}");
                     };
 
                     // コンシューマー登録
                     channel.BasicConsume(queueName, true, consumer);
 
                     while (true)
                     {
                         // キャンセル待ち   
                         if (cancel.IsCancellationRequested)
                         {
                             break;
                         }
                     }
                 }
             })(factory, tokenSource.Token), tokenSource.Token);
 
             // Ctl+C待機
             Console.CancelKeyPress += (_, e) =>
             {
                 e.Cancel = true;
                 tokenSource.Cancel(); // Taskキャンセル
             };
 
             Task.WaitAll(pTask, cTask);
 
             Console.WriteLine("stop .Net RabbitMQ Example. press any key to close.");
             Console.ReadKey();
         }
     }
 
     public class SendMessage
     {
         public string Message { get; set; }
         public long Timestamp { get; set; }
     }
 
     public class ConsumedMessage
     {
         public string Message { get; set; }
         public long Timestamp { get; set; }
     }
 }

-参考
--RabbitMQでPublisherとConsumer(C#編)~
https://qiita.com/lightstaff/items/1a8a1e602d486571cb3a
--RabbitMQ tutorial - "Hello World!" — RabbitMQ~
https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
--より洗練された実装~
https://github.com/OpenTouryoProject/DxCommon/tree/develop/Edge/RabbitMQ/Client/CS

***EasyNetQ [#df0bbf55]
-RabbitMQのための優れた.NET API
-初期の開発は、15belowがスポンサーとなって行われた。

***AMQPクライアント [#he029c15]
RabbitMQはサポートしない?

-AMQPNetLite
-Microsoft.Azure.Amqp
-...

*チュートリアル [#x8a94036]

**Windows [#ua9b4728]

***証明書の準備 [#l31f2bae]
[[Mosquittoのチュートリアル>Mosquitto#p4f5a191]]と同じ。

***SSLの設定 [#lc58e121]

-サーバー証明書を利用

--advanced.config
 [
   {rabbit,  [ 
     {ssl_listeners, [5671]},
     {ssl_options, [{cacertfile,"c:/certs/ca.crt"},
                   {certfile,"c:/certs/server.crt"},
                   {keyfile,"c:/certs/server.key"},
                   {verify,verify_none},
                   {fail_if_no_peer_cert,false}]}
   ]}
 ].

--サービスの再起動~
[[管理画面>#y14216dd]]でSSLポートを確認する。
 Listening ports
 amqp/ssl	0.0.0.0	5671
 amqp/ssl	::	5671

-パスワード認証~
[[管理画面>#y14216dd]]でユーザーを作成する。

-クライアント証明書を利用

--プラグインを有効化
 rabbitmq-plugins enable rabbitmq_auth_mechanism_ssl

--CNと一致したユーザーを作成する。

--advanced.config
 [
   {rabbit,  [ 
     {ssl_listeners, [5671]},
     {auth_mechanisms, ['EXTERNAL', 'PLAIN']},
     {ssl_cert_login_from, common_name},
     {ssl_options, [{cacertfile,"c:/certs/ca.crt"},
                   {certfile,"c:/certs/server.crt"},
                   {keyfile,"c:/certs/server.key"},
                   {verify,verify_peer},
                   {fail_if_no_peer_cert,true}]}
   ]}
 ].

--サービスの再起動~
[[管理画面>#y14216dd]]でSSLポートを確認する。
 Listening ports
 amqp/ssl	0.0.0.0	5671
 amqp/ssl	::	5671

***プログラムからアクセス [#bd32bdbe]

-認証局の証明書(ca.crt)は、[[証明書ストア>証明書#dccdb7e5]]の~
信頼されたルート証明機関に入れておく必要がある。

-サーバー証明書を利用

--以下を書き換える。
 using System.Net.Security; // 追加
 
 ...
 
 // ファクトリ生成
 var factory = new ConnectionFactory()
 {
   HostName = hostname,
   Ssl = new SslOption // 追加
   {
     Enabled = true,
     ServerName = hostname,
     AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch 
                              | SslPolicyErrors.RemoteCertificateChainErrors,
   }
 };

--デバッグ実行で動作確認~
動作した。

-パスワード認証を利用

--以下を書き換える。
 var factory = new ConnectionFactory()
 {
   HostName = hostname,
 
   UserName = "xxxxx", // 追加
   Password = "xxxxx", // 追加

--デバッグ実行で動作確認~
動作した。

-クライアント証明書を利用

--以下を書き換える。
 using System.Security.Cryptography.X509Certificates; // 追加
 
 ...
 
 X509Certificate2 cli = new X509Certificate2(@"C:\certs\client.pfx", "xxxxx"); // 追加
 
 // ファクトリ生成
 var factory = new ConnectionFactory()
 {
   HostName = hostname,
   //UserName = "xxxxx", // 削除
   //Password = "xxxxx", // 削除
   AuthMechanisms = new IAuthMechanismFactory[]{ new ExternalMechanismFactory()}, // 追加
   Ssl = new SslOption
   {
     Enabled = true,
     ServerName = hostname,
     AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch 
                              | SslPolicyErrors.RemoteCertificateChainErrors,
     Certs = new X509CertificateCollection(new X509Certificate[] { cli }) // 追加
   }
 };

--デバッグ実行で動作確認~
動作した(ssl_cert_login_fromを書く位置に注意!)。

**Linux [#jcd1f6f4]

*参考 [#y172abcc]

-RabbitMQ に入門してピョンピョンしてみた - kakakakakku blog~
https://kakakakakku.hatenablog.com/entry/2015/08/26/000853

**SIOS Tech. Lab [#g4a50d67]
-RabbitMQによる非同期処理~
https://tech-lab.sios.jp/archives/7902
-WindowsでRabbitMQ~
https://tech-lab.sios.jp/archives/7920
-Rest APIでRabbitMQ~
https://tech-lab.sios.jp/archives/7941
-.NETでRabbitMQ~
https://tech-lab.sios.jp/archives/8018

**Microsoft Docs [#dd5184c5]
-開発環境またはテスト環境の RabbitMQ でイベント バスを実装する~
https://docs.microsoft.com/ja-jp/dotnet/architecture/microservices/multi-container-microservice-net-applications/rabbitmq-event-bus-development-test-environment

-Azure Functions における Azure RabbitMQ バインド~
https://docs.microsoft.com/ja-jp/azure/azure-functions/functions-bindings-rabbitmq
-Azure Functions における RabbitMQ の出力バインド~
https://docs.microsoft.com/ja-jp/azure/azure-functions/functions-bindings-rabbitmq-output

**Qiita [#r64ad8af]
-RabbitMQをインストールする(Windows/Mac/Docker)~
https://qiita.com/suke_masa/items/5d583462727660663fde

-RabbitMQ でメッセージの送受信を行うまでの環境設定についてのメモ~
https://qiita.com/ptiringo/items/c554fa66f0d985394fed

----
Tags: [[:通信技術]], [[:.NET開発]], [[:IoT]]

トップ   編集 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS