「マイクロソフト系技術情報 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。
>rabbitmq-plugins enable rabbitmq_management ...
>rabbitmq-plugins enable rabbitmq_tracing ...
...RabbitMQ Server\rabbitmq_server-x.x.x\sbin
rabbitmqctl.bat
>rabbitmqctl(.bat) status
前述のrabbitmq-pluginsでプラグインを制御できる。
$ rabbitmqadmin ...サブ・コマンド...
>python.exe rabbitmqadmin ...サブ・コマンド...
rabbitmqadmin -u admin -p admin -V my-vhost get queue=my-q ackmode=ack_requeue_true
rabbitmqadmin get queue=my-q ackmode=ack_requeue_true
rabbitmqadmin -u admin -p admin -V my-vhost publish routing_key=my-key exchange=my-exchange payload=test
rabbitmqadmin publish routing_key=my-key exchange=my-exchange payload=test
%APPDATA%\RabbitMQ\rabbitmq.conf C:\Users\xxxxx\AppData\Roaming\RabbitMQ\rabbitmq.conf
/etc/rabbitmq/rabbitmq.config
rabbitmqadmin export c:\export.json
management.load_definitions = c:\import.json
前述の管理ツールを使用して設定を行う。
バーチャルホスト、キュー、エクスチェンジなどのリソースへのアクセスを承認する
rabbitmqctl add_user 'username' 'password'
rabbitmqctl change_password
rabbitmqctl list_users
# First ".*" for configure permission on every entity # Second ".*" for write permission on every entity # Third ".*" for read permission on every entity rabbitmqctl set_permissions -p "custom-vhost" "username" ".*" ".*" ".*"・削除
rabbitmqctl clear_permissions -p "custom-vhost" "username"・一括
for v in $(rabbitmqctl list_vhosts --silent); do rabbitmqctl set_permissions -p $v "a-user" ".*" ".*" ".*"; done
AMQP 1.0 で削除されたので、コレは、純粋に、RabbitMQの話。
送信する人
受信する人
送受信対象のメッセージ
キュー
Queueへの配送ルール
様々な言語向けにリリースされている。
↓ ↓ ↓
...
AMQPクライアントだが、RabbitMQはサポートしない?
[ {rabbit, [ {ssl_listeners, [5671]}, {ssl_options, [{cacertfile,"c:/certs/ca.crt"}, {certfile,"c:/certs/server.crt"}, {keyfile,"c:/certs/server.key"}, {verify,verify_none}, {fail_if_no_peer_cert,false}]} ]} ].
Listening ports amqp/ssl 0.0.0.0 5671 amqp/ssl :: 5671
rabbitmq-plugins enable rabbitmq_auth_mechanism_ssl
[ {rabbit, [ {ssl_listeners, [5671]}, {auth_mechanisms, ['EXTERNAL', 'PLAIN']}, {ssl_cert_login_from, common_name}, {ssl_options, [{cacertfile,"c:/certs/ca.crt"}, {certfile,"c:/certs/server.crt"}, {keyfile,"c:/certs/server.key"}, {verify,verify_peer}, {fail_if_no_peer_cert,true}]} ]} ].
Listening ports amqp/ssl 0.0.0.0 5671 amqp/ssl :: 5671
using System.Net.Security; // 追加 ... // ファクトリ生成 var factory = new ConnectionFactory() { HostName = hostname, Ssl = new SslOption // 追加 { Enabled = true, ServerName = hostname, AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch | SslPolicyErrors.RemoteCertificateChainErrors, } };
var factory = new ConnectionFactory() { HostName = hostname, UserName = "xxxxx", // 追加 Password = "xxxxx", // 追加
using System.Security.Cryptography.X509Certificates; // 追加 ... X509Certificate2 cli = new X509Certificate2(@"C:\certs\client.pfx", "xxxxx"); // 追加 // ファクトリ生成 var factory = new ConnectionFactory() { HostName = hostname, //UserName = "xxxxx", // 削除 //Password = "xxxxx", // 削除 AuthMechanisms = new IAuthMechanismFactory[]{ new ExternalMechanismFactory()}, // 追加 Ssl = new SslOption { Enabled = true, ServerName = hostname, AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch | SslPolicyErrors.RemoteCertificateChainErrors, Certs = new X509CertificateCollection(new X509Certificate[] { cli }) // 追加 } };
QiitaのRabbitMQ.Clientのサンプルをベースに弄った。
using Newtonsoft.Json; using RabbitMQ; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Net.Security; using System.Security.Cryptography.X509Certificates; namespace ConsoleApp1 { class Program { static void Main(string[] args) { // ホスト名 var hostname = "localhost"; // Taskキャンセルトークン var tokenSource = new CancellationTokenSource(); Console.WriteLine($"start .Net RabbitMQ Example. Ctl+C to exit"); X509Certificate2 cli = new X509Certificate2(@"C:\certs\client.pfx", "xxxxx"); // ファクトリ生成 var factory = new ConnectionFactory() { HostName = hostname, //UserName = "hogecli", //Password = "hogecli", AuthMechanisms = new IAuthMechanismFactory[] { new ExternalMechanismFactory() }, Ssl = new SslOption { Enabled = true, ServerName = hostname, AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch | SslPolicyErrors.RemoteCertificateChainErrors, Certs = new X509CertificateCollection(new X509Certificate[] { cli }) } }; // パブリッシャータスク var pTask = Task.Run(() => new Action<ConnectionFactory, CancellationToken>(async (f, cancel) => { // コネクション&チャンネル生成 using (var conn = f.CreateConnection()) using (var channel = conn.CreateModel()) { // Exchange生成 channel.ExchangeDeclare("test", "fanout", false, true); while (true) { // キャンセル待ち if (cancel.IsCancellationRequested) { break; } var msg = new SendMessage() { Message = "Hello", Timestamp = DateTime.UtcNow.ToBinary() }; var body = JsonConvert.SerializeObject(msg); // Publish!! try { channel.BasicPublish("test", "", null, Encoding.UTF8.GetBytes(body)); Console.WriteLine($"success send. message: {msg.Message}, timestamp: {msg.Timestamp}"); } catch (Exception ex) { Console.WriteLine($"failer send. reason: {ex.Message}"); } await Task.Delay(10000); } } })(factory, tokenSource.Token), tokenSource.Token); // コンシューマータスク var cTask = Task.Run(() => new Action<ConnectionFactory, CancellationToken>((f, cancel) => { // コネクション&チャンネル生成 using (var conn = f.CreateConnection()) using (var channel = conn.CreateModel()) { // Exchange生成 channel.ExchangeDeclare("test", "fanout", false, true); // Queue生成 var queueName = channel.QueueDeclare().QueueName; // Bind Queue channel.QueueBind(queueName, "test", ""); // コンシューマー生成 var consumer = new EventingBasicConsumer(channel); // 受信イベント定義 consumer.Received += (_, ea) => { var msg = JsonConvert.DeserializeObject<ConsumedMessage>(Encoding.UTF8.GetString(ea.Body.ToArray())); Console.WriteLine($"success consumed. message: {msg.Message}, timestamp: {msg.Timestamp}"); }; // コンシューマー登録 channel.BasicConsume(queueName, true, consumer); while (true) { // キャンセル待ち if (cancel.IsCancellationRequested) { break; } } } })(factory, tokenSource.Token), tokenSource.Token); // Ctl+C待機 Console.CancelKeyPress += (_, e) => { e.Cancel = true; tokenSource.Cancel(); // Taskキャンセル }; Task.WaitAll(pTask, cTask); Console.WriteLine("stop .Net RabbitMQ Example. press any key to close."); Console.ReadKey(); } } public class SendMessage { public string Message { get; set; } public long Timestamp { get; set; } } public class ConsumedMessage { public string Message { get; set; } public long Timestamp { get; set; } } }