「マイクロソフト系技術情報 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。
目次 †
概要 †
- AMQPブローカーのOSS
- MQTT プロトコルサポートのプラグインもある。
詳細 †
Windows †
インストール †
ブローカー起動 †
- サービスとしてインストールされる。
- 起動:メニューから[RabbitMQ Service - start]を選択
- 停止:メニューから[RabbitMQ Service - stop]を選択
送受信 †
その他の環境 †
Linux †
Raspberry Pi †
Docker on Windows †
Docker on Raspberry Pi †
SaaS / PaaS †
管理ツール †
管理画面 †
- メニューから[RabbitMQ Command Prompt (sbin dir)]を選択
rabbitmqctl †
- 標準の管理ツールでvhost作成・ユーザ作成・権限付与などの管理作業を行う。
- .batファイルとして提供されている。
rabbitmqctl.bat
rabbitmq-plugins †
前述のrabbitmq-pluginsでプラグインを制御できる。
rabbitmqadmin †
- pythonが必要で、rabbitmqadminファイルを管理画面からダウンロードして、以下のように使用する。
... †
参考 †
設定ファイル †
新・旧の設定ファイル †
- 場所
インストール後、設定ファイルは存在しないので、必要なら、作成する必要がある。
- advanced.config
- 旧式の書き方。
- 新スタイルの設定形式では表現できない限られた設定項目。
- 旧ファイル
- rabbitmq.config
- 旧式の書き方。
- Erlang の標準設定ファイル書式で記述。
インポート・エクスポート †
管理ツール †
前述の管理ツールを使用して設定を行う。
認証 †
バーチャルホスト、キュー、エクスチェンジなどのリソースへのアクセスを承認する
パスワード †
- デフォルト・ユーザー
- ユーザー名「guest」パスワード「guest」。
- バーチャルホスト「/」へのフルアクセスを許可される。
- 規定ではホスト・ローカル接続にしか使用できない。
loopback_usersの設定をnoneにすることで、
リモートホストからの接続を許可することが可能。
- CLI
- 追加
rabbitmqctl add_user 'username' 'password'
- 更新
rabbitmqctl change_password
- 削除
rabbitmqctl delete_user 'username'
- 一覧
rabbitmqctl list_users
- アクセス許可設定
Exchange以下(Queueなど)のアクセス許可を設定できる。
(Topicの認可というのものあるらしい(既定では無効になっている))
・追加
# First ".*" for configure permission on every entity
# Second ".*" for write permission on every entity
# Third ".*" for read permission on every entity
rabbitmqctl set_permissions -p "custom-vhost" "username" ".*" ".*" ".*"
・削除
rabbitmqctl clear_permissions -p "custom-vhost" "username"
・一括
for v in $(rabbitmqctl list_vhosts --silent); do rabbitmqctl set_permissions -p $v "a-user" ".*" ".*" ".*"; done
- その他
- ノードブート時の定義のインポート
- 別の認証バックエンド(LDAP、HTTP、AMQPなど)
証明書 †
- サーバー認証
rabbitmq-auth-mechanism-sslを有効にして、
クライアントがEXTERNALメカニズムを使うように設定
- クライアント認証
- ユーザー名はCNと一致する。
- クライアントが提供するパスワードはすべて無視される。
ブローカーモデル †
AMQP 1.0 で削除されたので、コレは、純粋に、RabbitMQの話。
Producer †
送信する人
Consumer †
受信する人
Message †
送受信対象のメッセージ
Queue †
キュー
Exchange †
Queueへの配送ルール
- direct
routing keyとbinding keyが完全したMessageだけQueueに配送。
- fanout
binding keyを無視して全てのQueueにMessageを配送(ブロードキャスト)
- topic
routing keyとbinding keyで、部分一致したMessageをQueueに配送。
Vhosts (Virtual Hosts) †
- 固有の名前を持つ個々のコンテナ(論理的グループ)
- コンテナ中には、以下リソースがある。
送受信(C#) †
RabbitMQ.Client †
様々な言語向けにリリースされている。
↓ ↓ ↓
- Qiitaのサンプルをベースに弄った。
using Newtonsoft.Json;
using RabbitMQ;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Net.Security;
using System.Security.Cryptography.X509Certificates;
namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
// ホスト名
var hostname = "localhost";
// Taskキャンセルトークン
var tokenSource = new CancellationTokenSource();
Console.WriteLine($"start .Net RabbitMQ Example. Ctl+C to exit");
X509Certificate2 cli = new X509Certificate2(@"C:\certs\client.pfx", "xxxxx");
// ファクトリ生成
var factory = new ConnectionFactory()
{
HostName = hostname,
//UserName = "hogecli",
//Password = "hogecli",
AuthMechanisms = new IAuthMechanismFactory[] { new ExternalMechanismFactory() },
Ssl = new SslOption
{
Enabled = true,
ServerName = hostname,
AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch
| SslPolicyErrors.RemoteCertificateChainErrors,
Certs = new X509CertificateCollection(new X509Certificate[] { cli })
}
};
// パブリッシャータスク
var pTask = Task.Run(() => new Action<ConnectionFactory, CancellationToken>(async (f, cancel) => {
// コネクション&チャンネル生成
using (var conn = f.CreateConnection())
using (var channel = conn.CreateModel())
{
// Exchange生成
channel.ExchangeDeclare("test", "fanout", false, true);
while (true)
{
// キャンセル待ち
if (cancel.IsCancellationRequested)
{
break;
}
var msg = new SendMessage()
{
Message = "Hello",
Timestamp = DateTime.UtcNow.ToBinary()
};
var body = JsonConvert.SerializeObject(msg);
// Publish!!
try
{
channel.BasicPublish("test", "", null, Encoding.UTF8.GetBytes(body));
Console.WriteLine($"success send. message: {msg.Message}, timestamp: {msg.Timestamp}");
}
catch (Exception ex)
{
Console.WriteLine($"failer send. reason: {ex.Message}");
}
await Task.Delay(10000);
}
}
})(factory, tokenSource.Token), tokenSource.Token);
// コンシューマータスク
var cTask = Task.Run(() => new Action<ConnectionFactory, CancellationToken>((f, cancel) => {
// コネクション&チャンネル生成
using (var conn = f.CreateConnection())
using (var channel = conn.CreateModel())
{
// Exchange生成
channel.ExchangeDeclare("test", "fanout", false, true);
// Queue生成
var queueName = channel.QueueDeclare().QueueName;
// Bind Queue
channel.QueueBind(queueName, "test", "");
// コンシューマー生成
var consumer = new EventingBasicConsumer(channel);
// 受信イベント定義
consumer.Received += (_, ea) =>
{
var msg = JsonConvert.DeserializeObject<ConsumedMessage>(Encoding.UTF8.GetString(ea.Body.ToArray()));
Console.WriteLine($"success consumed. message: {msg.Message}, timestamp: {msg.Timestamp}");
};
// コンシューマー登録
channel.BasicConsume(queueName, true, consumer);
while (true)
{
// キャンセル待ち
if (cancel.IsCancellationRequested)
{
break;
}
}
}
})(factory, tokenSource.Token), tokenSource.Token);
// Ctl+C待機
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
tokenSource.Cancel(); // Taskキャンセル
};
Task.WaitAll(pTask, cTask);
Console.WriteLine("stop .Net RabbitMQ Example. press any key to close.");
Console.ReadKey();
}
}
public class SendMessage
{
public string Message { get; set; }
public long Timestamp { get; set; }
}
public class ConsumedMessage
{
public string Message { get; set; }
public long Timestamp { get; set; }
}
}
EasyNetQ †
- RabbitMQのための優れた.NET API
- 初期の開発は、15belowがスポンサーとなって行われた。
AMQPクライアント †
RabbitMQはサポートしない?
- AMQPNetLite?
- Microsoft.Azure.Amqp
- ...
チュートリアル †
Windows †
証明書の準備 †
Mosquittoのチュートリアルと同じ。
SSLの設定 †
- advanced.config
[
{rabbit, [
{ssl_listeners, [5671]},
{ssl_options, [{cacertfile,"c:/certs/ca.crt"},
{certfile,"c:/certs/server.crt"},
{keyfile,"c:/certs/server.key"},
{verify,verify_none},
{fail_if_no_peer_cert,false}]}
]}
].
- advanced.config
[
{rabbit, [
{ssl_listeners, [5671]},
{auth_mechanisms, ['EXTERNAL', 'PLAIN']},
{ssl_cert_login_from, common_name},
{ssl_options, [{cacertfile,"c:/certs/ca.crt"},
{certfile,"c:/certs/server.crt"},
{keyfile,"c:/certs/server.key"},
{verify,verify_peer},
{fail_if_no_peer_cert,true}]}
]}
].
プログラムからアクセス †
- 認証局の証明書(ca.crt)は、証明書ストアの
信頼されたルート証明機関に入れておく必要がある。
- 以下を書き換える。
using System.Net.Security; // 追加
...
// ファクトリ生成
var factory = new ConnectionFactory()
{
HostName = hostname,
Ssl = new SslOption // 追加
{
Enabled = true,
ServerName = hostname,
AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch
| SslPolicyErrors.RemoteCertificateChainErrors,
}
};
- 以下を書き換える。
var factory = new ConnectionFactory()
{
HostName = hostname,
UserName = "xxxxx", // 追加
Password = "xxxxx", // 追加
- 以下を書き換える。
using System.Security.Cryptography.X509Certificates; // 追加
...
X509Certificate2 cli = new X509Certificate2(@"C:\certs\client.pfx", "xxxxx"); // 追加
// ファクトリ生成
var factory = new ConnectionFactory()
{
HostName = hostname,
//UserName = "xxxxx", // 削除
//Password = "xxxxx", // 削除
AuthMechanisms = new IAuthMechanismFactory[]{ new ExternalMechanismFactory()}, // 追加
Ssl = new SslOption
{
Enabled = true,
ServerName = hostname,
AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch
| SslPolicyErrors.RemoteCertificateChainErrors,
Certs = new X509CertificateCollection(new X509Certificate[] { cli }) // 追加
}
};
- デバッグ実行で動作確認
動作した(ssl_cert_login_fromを書く位置に注意!)。
Linux †
参考 †
SIOS Tech. Lab †
Microsoft Docs †
Qiita †
Tags: :通信技術, :.NET開発, :IoT
|