マイクロソフト系技術情報 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。

目次

概要

  • AMQPブローカーのOSS
  • MQTT プロトコルサポートのプラグインもある。

詳細

Windows

インストール

  • Erlang
  • インストール(既定値)
  • 環境変数設定
    不要だった。
  • RabbitMQ

ブローカー起動

  • サービスとしてインストールされる。
  • 起動:メニューから[RabbitMQ Service - start]を選択
  • 停止:メニューから[RabbitMQ Service - stop]を選択

送受信

その他の環境

Linux

Raspberry Pi

Docker on Windows

Docker on Raspberry Pi

SaaS / PaaS

  • Azure:Azure Service Bus(ブリッジ機能)
  • Azure:Amazon MQ for RabbitMQ
  • GCP:Cloud Pub/Sub?

管理ツール

管理画面

  • 設定
  • メニューから[RabbitMQ Command Prompt (sbin dir)]を選択
  • 以下の2つのコマンドを実行する。
  • >rabbitmq-plugins enable rabbitmq_management
    ...
  • >rabbitmq-plugins enable rabbitmq_tracing
    ...
  • アクセス
  • アカウントは
    guest/guest

rabbitmqctl

  • 標準の管理ツールでvhost作成・ユーザ作成・権限付与などの管理作業を行う。
  • Windowsの場合
    以下から使用する。
  • 格納場所は以下のディレクトリ。
    ...RabbitMQ Server\rabbitmq_server-x.x.x\sbin
  • .batファイルとして提供されている。
    rabbitmqctl.bat
  • メニューから[RabbitMQ Command Prompt (sbin dir)]を選択し、.batは省略可能して実行可能。
    >rabbitmqctl(.bat) status

rabbitmq-plugins

前述のrabbitmq-pluginsでプラグインを制御できる。

rabbitmqadmin

  • pythonが必要で、rabbitmqadminファイルを管理画面からダウンロードして、以下のように使用する。
  • Linuxルの場合
    rabbitmqadminファイルを/usr/local/binにコピーし、
    $ rabbitmqadmin ...サブ・コマンド...
  • Windowsの場合 rabbitmqadminファイルをコピーし(sbin dirが良いかと)、
    >python.exe rabbitmqadmin ...サブ・コマンド...
  • Messageの送受信
  • 受信
    • rabbitmqadmin -u admin -p admin -V my-vhost get queue=my-q ackmode=ack_requeue_true
    • 初期値
      rabbitmqadmin get queue=my-q ackmode=ack_requeue_true
  • 送信
    • rabbitmqadmin -u admin -p admin -V my-vhost publish routing_key=my-key exchange=my-exchange payload=test
    • 初期値
      rabbitmqadmin publish routing_key=my-key exchange=my-exchange payload=test

...

参考

設定ファイル

新・旧の設定ファイル

  • 場所
    インストール後、設定ファイルは存在しないので、必要なら、作成する必要がある。
  • Windows:
    %APPDATA%\RabbitMQ\rabbitmq.conf
    C:\Users\xxxxx\AppData\Roaming\RabbitMQ\rabbitmq.conf
  • Linux:
    /etc/rabbitmq/rabbitmq.config
  • 新ファイル
    • rabbitmq.conf
      • 新式の書き方。
      • sysctl 方式で記述。
  • advanced.config
    • 旧式の書き方。
    • 新スタイルの設定形式では表現できない限られた設定項目。
  • 旧ファイル
    • rabbitmq.config
      • 旧式の書き方。
      • Erlang の標準設定ファイル書式で記述。

インポート・エクスポート

  • エクスポート
    rabbitmqadmin export c:\export.json
  • インポート
    rabbitmq.confに以下を追記する。
    management.load_definitions = c:\import.json

管理ツール

前述の管理ツールを使用して設定を行う。

認証

バーチャルホスト、キュー、エクスチェンジなどのリソースへのアクセスを承認する

パスワード

  • デフォルト・ユーザー
    • ユーザー名「guest」パスワード「guest」。
    • バーチャルホスト「/」へのフルアクセスを許可される。
    • 規定ではホスト・ローカル接続にしか使用できない。
      loopback_usersの設定をnoneにすることで、
      リモートホストからの接続を許可することが可能。
  • ユーザの管理
  • CLI
    • 追加
      rabbitmqctl add_user 'username' 'password'
    • 更新
      rabbitmqctl change_password
    • 削除  rabbitmqctl delete_user 'username'
    • 一覧
      rabbitmqctl list_users
  • アクセス許可設定
    Exchange以下(Queueなど)のアクセス許可を設定できる。
    (Topicの認可というのものあるらしい(既定では無効になっている))
    ・追加
    # First ".*" for configure permission on every entity
    # Second ".*" for write permission on every entity
    # Third ".*" for read permission on every entity
    rabbitmqctl set_permissions -p "custom-vhost" "username" ".*" ".*" ".*"
    ・削除
    rabbitmqctl clear_permissions -p "custom-vhost" "username"
    ・一括
    for v in $(rabbitmqctl list_vhosts --silent); do rabbitmqctl set_permissions -p $v "a-user" ".*" ".*" ".*"; done
  • その他
    • ノードブート時の定義のインポート
    • 別の認証バックエンド(LDAP、HTTP、AMQPなど)

証明書

  • サーバー認証とクライアント認証の両方が可能。
  • サーバー認証
    rabbitmq-auth-mechanism-sslを有効にして、
    クライアントがEXTERNALメカニズムを使うように設定
  • クライアント認証
    • ユーザー名はCNと一致する。
    • クライアントが提供するパスワードはすべて無視される。
  • 参考

ブローカーモデル

AMQP 1.0 で削除されたので、コレは、純粋に、RabbitMQの話。

Producer

送信する人

Consumer

受信する人

Message

送受信対象のメッセージ

Queue

キュー

Exchange

Queueへの配送ルール

  • direct
    routing keyとbinding keyが完全したMessageだけQueueに配送。
  • fanout
    binding keyを無視して全てのQueueにMessageを配送(ブロードキャスト)
  • topic
    routing keyとbinding keyで、部分一致したMessageをQueueに配送。

Vhosts (Virtual Hosts)

  • 固有の名前を持つ個々のコンテナ(論理的グループ)
  • コンテナ中には、以下リソースがある。
    • Connection
    • Exchange
    • Queue
    • Binding
    • ユーザー権限
    • およびその他のシステムリソース

送受信(C#)

RabbitMQ.Client

様々な言語向けにリリースされている。

  • 以下のコードは.NET Coreでも大方動く。
  • 一箇所、変更した。
  • ea.Body

↓ ↓ ↓

  • ea.Body.ToArray?()

EasyNetQ

...

RabbitMQはサポートしない?

AMQPクライアントだが、RabbitMQはサポートしない?

  • AMQPNetLite?
  • Microsoft.Azure.Amqp
  • ...

チュートリアル

Windows

証明書の準備

Mosquittoのチュートリアルと同じ。

SSLの設定

  • サーバー証明書を利用
  • advanced.config
    [
      {rabbit,  [ 
        {ssl_listeners, [5671]},
        {ssl_options, [{cacertfile,"c:/certs/ca.crt"},
                      {certfile,"c:/certs/server.crt"},
                      {keyfile,"c:/certs/server.key"},
                      {verify,verify_none},
                      {fail_if_no_peer_cert,false}]}
      ]}
    ].
  • サービスの再起動
    管理画面でSSLポートを確認する。
    Listening ports
    amqp/ssl	0.0.0.0	5671
    amqp/ssl	::	5671
  • パスワード認証
    管理画面でユーザーを作成する。
  • クライアント証明書を利用
  • プラグインを有効化
    rabbitmq-plugins enable rabbitmq_auth_mechanism_ssl
  • CNと一致したユーザーを作成する。
  • advanced.config
    [
      {rabbit,  [ 
        {ssl_listeners, [5671]},
        {auth_mechanisms, ['EXTERNAL', 'PLAIN']},
        {ssl_cert_login_from, common_name},
        {ssl_options, [{cacertfile,"c:/certs/ca.crt"},
                      {certfile,"c:/certs/server.crt"},
                      {keyfile,"c:/certs/server.key"},
                      {verify,verify_peer},
                      {fail_if_no_peer_cert,true}]}
      ]}
    ].
  • サービスの再起動
    管理画面でSSLポートを確認する。
    Listening ports
    amqp/ssl	0.0.0.0	5671
    amqp/ssl	::	5671

プログラムからアクセス

  • 認証局の証明書(ca.crt)は、証明書ストア
    信頼されたルート証明機関に入れておく必要がある。
  • サーバー証明書を利用
  • 以下を書き換える。
    using System.Net.Security; // 追加
    
    ...
    
    // ファクトリ生成
    var factory = new ConnectionFactory()
    {
      HostName = hostname,
      Ssl = new SslOption // 追加
      {
        Enabled = true,
        ServerName = hostname,
        AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch 
                                 | SslPolicyErrors.RemoteCertificateChainErrors,
      }
    };
  • デバッグ実行で動作確認
    動作した。
  • パスワード認証を利用
  • 以下を書き換える。
    var factory = new ConnectionFactory()
    {
      HostName = hostname,
    
      UserName = "xxxxx", // 追加
      Password = "xxxxx", // 追加
  • デバッグ実行で動作確認
    動作した。
  • クライアント証明書を利用
  • 以下を書き換える。
    using System.Security.Cryptography.X509Certificates; // 追加
    
    ...
    
    X509Certificate2 cli = new X509Certificate2(@"C:\certs\client.pfx", "xxxxx"); // 追加
    
    // ファクトリ生成
    var factory = new ConnectionFactory()
    {
      HostName = hostname,
      //UserName = "xxxxx", // 削除
      //Password = "xxxxx", // 削除
      AuthMechanisms = new IAuthMechanismFactory[]{ new ExternalMechanismFactory()}, // 追加
      Ssl = new SslOption
      {
        Enabled = true,
        ServerName = hostname,
        AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch 
                                 | SslPolicyErrors.RemoteCertificateChainErrors,
        Certs = new X509CertificateCollection(new X509Certificate[] { cli }) // 追加
      }
    };
  • デバッグ実行で動作確認
    動作した(ssl_cert_login_fromを書く位置に注意!)。

Linux

プログラム例

QiitaのRabbitMQ.Clientのサンプルをベースに弄った。

using Newtonsoft.Json;
using RabbitMQ;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Net.Security;
using System.Security.Cryptography.X509Certificates;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            // ホスト名
            var hostname = "localhost";

            // Taskキャンセルトークン
            var tokenSource = new CancellationTokenSource();

            Console.WriteLine($"start .Net RabbitMQ Example. Ctl+C to exit");

            X509Certificate2 cli = new X509Certificate2(@"C:\certs\client.pfx", "xxxxx");

            // ファクトリ生成
            var factory = new ConnectionFactory()
            {
                HostName = hostname,
                //UserName = "hogecli",
                //Password = "hogecli",
                AuthMechanisms = new IAuthMechanismFactory[] { new ExternalMechanismFactory() },
                Ssl = new SslOption
                {
                    Enabled = true,
                    ServerName = hostname,
                    AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch 
                                             | SslPolicyErrors.RemoteCertificateChainErrors,
                    Certs = new X509CertificateCollection(new X509Certificate[] { cli })
                }
            };

            // パブリッシャータスク
            var pTask = Task.Run(() => new Action<ConnectionFactory, CancellationToken>(async (f, cancel) => {
                // コネクション&チャンネル生成
                using (var conn = f.CreateConnection())
                using (var channel = conn.CreateModel())
                {
                    // Exchange生成
                    channel.ExchangeDeclare("test", "fanout", false, true);

                    while (true)
                    {
                        // キャンセル待ち
                        if (cancel.IsCancellationRequested)
                        {
                            break;
                        }

                        var msg = new SendMessage()
                        {
                            Message = "Hello",
                            Timestamp = DateTime.UtcNow.ToBinary()
                        };
                        var body = JsonConvert.SerializeObject(msg);

                        // Publish!!
                        try
                        {
                            channel.BasicPublish("test", "", null, Encoding.UTF8.GetBytes(body));
                            Console.WriteLine($"success send. message: {msg.Message}, timestamp: {msg.Timestamp}");
                        }
                        catch (Exception ex)
                        {
                            Console.WriteLine($"failer send. reason: {ex.Message}");
                        }

                        await Task.Delay(10000);
                    }
                }
            })(factory, tokenSource.Token), tokenSource.Token);

            // コンシューマータスク
            var cTask = Task.Run(() => new Action<ConnectionFactory, CancellationToken>((f, cancel) => {
                // コネクション&チャンネル生成
                using (var conn = f.CreateConnection())
                using (var channel = conn.CreateModel())
                {
                    // Exchange生成
                    channel.ExchangeDeclare("test", "fanout", false, true);

                    // Queue生成
                    var queueName = channel.QueueDeclare().QueueName;

                    // Bind Queue
                    channel.QueueBind(queueName, "test", "");

                    // コンシューマー生成
                    var consumer = new EventingBasicConsumer(channel);

                    // 受信イベント定義
                    consumer.Received += (_, ea) =>
                    {
                        var msg = JsonConvert.DeserializeObject<ConsumedMessage>(Encoding.UTF8.GetString(ea.Body.ToArray()));
                        Console.WriteLine($"success consumed. message: {msg.Message}, timestamp: {msg.Timestamp}");
                    };

                    // コンシューマー登録
                    channel.BasicConsume(queueName, true, consumer);

                    while (true)
                    {
                        // キャンセル待ち   
                        if (cancel.IsCancellationRequested)
                        {
                            break;
                        }
                    }
                }
            })(factory, tokenSource.Token), tokenSource.Token);

            // Ctl+C待機
            Console.CancelKeyPress += (_, e) =>
            {
                e.Cancel = true;
                tokenSource.Cancel(); // Taskキャンセル
            };

            Task.WaitAll(pTask, cTask);

            Console.WriteLine("stop .Net RabbitMQ Example. press any key to close.");
            Console.ReadKey();
        }
    }

    public class SendMessage
    {
        public string Message { get; set; }
        public long Timestamp { get; set; }
    }

    public class ConsumedMessage
    {
        public string Message { get; set; }
        public long Timestamp { get; set; }
    }
}

参考

SIOS Tech. Lab

Microsoft Docs

Qiita


Tags: :通信技術, :.NET開発, :IoT


トップ   編集 凍結 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2021-08-30 (月) 09:15:57 (23d)