マイクロソフト系技術情報 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。

目次

概要

詳細

Windows

インストール

ブローカー起動

送受信

その他の環境

Linux

Raspberry Pi

Docker on Windows

Docker on Raspberry Pi

SaaS/PaaS

管理ツール

管理画面

rabbitmqctl

rabbitmq-plugins

前述のrabbitmq-pluginsでプラグインを制御できる。

rabbitmqadmin

...

参考

設定ファイル

新・旧の設定ファイル

インポート・エクスポート

管理ツール

前述の管理ツールを使用して設定を行う。

認証

バーチャルホスト、キュー、エクスチェンジなどのリソースへのアクセスを承認する

パスワード

証明書

ブローカーモデル

AMQP 1.0 で削除されたので、コレは、純粋に、RabbitMQの話。

Producer

送信する人

Consumer

受信する人

Message

送受信対象のメッセージ

Queue

キュー

Exchange

Queueへの配送ルール

Vhosts (Virtual Hosts)

送受信(C#)

RabbitMQ.Client

様々な言語向けにリリースされている。

↓ ↓ ↓

EasyNetQ

...

RabbitMQはサポートしない?

AMQPクライアントだが、RabbitMQはサポートしない?

チュートリアル

Windows

証明書の準備

Mosquittoのチュートリアルと同じ。

SSLの設定

プログラムからアクセス

Linux

プログラム例

QiitaのRabbitMQ.Clientのサンプルをベースに弄った。

using Newtonsoft.Json;
using RabbitMQ;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Net.Security;
using System.Security.Cryptography.X509Certificates;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            // ホスト名
            var hostname = "localhost";

            // Taskキャンセルトークン
            var tokenSource = new CancellationTokenSource();

            Console.WriteLine($"start .Net RabbitMQ Example. Ctl+C to exit");

            X509Certificate2 cli = new X509Certificate2(@"C:\certs\client.pfx", "xxxxx");

            // ファクトリ生成
            var factory = new ConnectionFactory()
            {
                HostName = hostname,
                //UserName = "hogecli",
                //Password = "hogecli",
                AuthMechanisms = new IAuthMechanismFactory[] { new ExternalMechanismFactory() },
                Ssl = new SslOption
                {
                    Enabled = true,
                    ServerName = hostname,
                    AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch 
                                             | SslPolicyErrors.RemoteCertificateChainErrors,
                    Certs = new X509CertificateCollection(new X509Certificate[] { cli })
                }
            };

            // パブリッシャータスク
            var pTask = Task.Run(() => new Action<ConnectionFactory, CancellationToken>(async (f, cancel) => {
                // コネクション&チャンネル生成
                using (var conn = f.CreateConnection())
                using (var channel = conn.CreateModel())
                {
                    // Exchange生成
                    channel.ExchangeDeclare("test", "fanout", false, true);

                    while (true)
                    {
                        // キャンセル待ち
                        if (cancel.IsCancellationRequested)
                        {
                            break;
                        }

                        var msg = new SendMessage()
                        {
                            Message = "Hello",
                            Timestamp = DateTime.UtcNow.ToBinary()
                        };
                        var body = JsonConvert.SerializeObject(msg);

                        // Publish!!
                        try
                        {
                            channel.BasicPublish("test", "", null, Encoding.UTF8.GetBytes(body));
                            Console.WriteLine($"success send. message: {msg.Message}, timestamp: {msg.Timestamp}");
                        }
                        catch (Exception ex)
                        {
                            Console.WriteLine($"failer send. reason: {ex.Message}");
                        }

                        await Task.Delay(10000);
                    }
                }
            })(factory, tokenSource.Token), tokenSource.Token);

            // コンシューマータスク
            var cTask = Task.Run(() => new Action<ConnectionFactory, CancellationToken>((f, cancel) => {
                // コネクション&チャンネル生成
                using (var conn = f.CreateConnection())
                using (var channel = conn.CreateModel())
                {
                    // Exchange生成
                    channel.ExchangeDeclare("test", "fanout", false, true);

                    // Queue生成
                    var queueName = channel.QueueDeclare().QueueName;

                    // Bind Queue
                    channel.QueueBind(queueName, "test", "");

                    // コンシューマー生成
                    var consumer = new EventingBasicConsumer(channel);

                    // 受信イベント定義
                    consumer.Received += (_, ea) =>
                    {
                        var msg = JsonConvert.DeserializeObject<ConsumedMessage>(Encoding.UTF8.GetString(ea.Body.ToArray()));
                        Console.WriteLine($"success consumed. message: {msg.Message}, timestamp: {msg.Timestamp}");
                    };

                    // コンシューマー登録
                    channel.BasicConsume(queueName, true, consumer);

                    while (true)
                    {
                        // キャンセル待ち   
                        if (cancel.IsCancellationRequested)
                        {
                            break;
                        }
                    }
                }
            })(factory, tokenSource.Token), tokenSource.Token);

            // Ctl+C待機
            Console.CancelKeyPress += (_, e) =>
            {
                e.Cancel = true;
                tokenSource.Cancel(); // Taskキャンセル
            };

            Task.WaitAll(pTask, cTask);

            Console.WriteLine("stop .Net RabbitMQ Example. press any key to close.");
            Console.ReadKey();
        }
    }

    public class SendMessage
    {
        public string Message { get; set; }
        public long Timestamp { get; set; }
    }

    public class ConsumedMessage
    {
        public string Message { get; set; }
        public long Timestamp { get; set; }
    }
}

参考

SIOS Tech. Lab

Microsoft Docs

Qiita


Tags: :通信技術, :.NET開発, :IoT


トップ   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS