「マイクロソフト系技術情報 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。
>rabbitmq-plugins enable rabbitmq_management ...
>rabbitmq-plugins enable rabbitmq_tracing ...
...RabbitMQ Server\rabbitmq_server-x.x.x\sbin
rabbitmqctl.bat
>rabbitmqctl(.bat) status
前述のrabbitmq-pluginsでプラグインを制御できる。
$ rabbitmqadmin ...サブ・コマンド...
>python.exe rabbitmqadmin ...サブ・コマンド...
rabbitmqadmin -u admin -p admin -V my-vhost get queue=my-q ackmode=ack_requeue_true
rabbitmqadmin get queue=my-q ackmode=ack_requeue_true
rabbitmqadmin -u admin -p admin -V my-vhost publish routing_key=my-key exchange=my-exchange payload=test
rabbitmqadmin publish routing_key=my-key exchange=my-exchange payload=test
%APPDATA%\RabbitMQ\rabbitmq.conf C:\Users\xxxxx\AppData\Roaming\RabbitMQ\rabbitmq.conf
/etc/rabbitmq/rabbitmq.config
rabbitmqadmin export c:\export.json
management.load_definitions = c:\import.json
前述の管理ツールを使用して設定を行う。
バーチャルホスト、キュー、エクスチェンジなどのリソースへのアクセスを承認する
rabbitmqctl add_user 'username' 'password'
rabbitmqctl change_password
rabbitmqctl delete_user 'username'
rabbitmqctl list_users
# First ".*" for configure permission on every entity # Second ".*" for write permission on every entity # Third ".*" for read permission on every entity rabbitmqctl set_permissions -p "custom-vhost" "username" ".*" ".*" ".*"・削除
rabbitmqctl clear_permissions -p "custom-vhost" "username"・一括
for v in $(rabbitmqctl list_vhosts --silent); do rabbitmqctl set_permissions -p $v "a-user" ".*" ".*" ".*"; done
AMQP 1.0 で削除されたので、コレは、純粋に、RabbitMQの話。
送信する人
受信する人
送受信対象のメッセージ
キュー
Queueへの配送ルール
様々な言語向けにリリースされている。
↓ ↓ ↓
using Newtonsoft.Json;
using RabbitMQ;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Net.Security;
using System.Security.Cryptography.X509Certificates;
namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
// ホスト名
var hostname = "localhost";
// Taskキャンセルトークン
var tokenSource = new CancellationTokenSource();
Console.WriteLine($"start .Net RabbitMQ Example. Ctl+C to exit");
X509Certificate2 cli = new X509Certificate2(@"C:\certs\client.pfx", "xxxxx");
// ファクトリ生成
var factory = new ConnectionFactory()
{
HostName = hostname,
//UserName = "hogecli",
//Password = "hogecli",
AuthMechanisms = new IAuthMechanismFactory[] { new ExternalMechanismFactory() },
Ssl = new SslOption
{
Enabled = true,
ServerName = hostname,
AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch
| SslPolicyErrors.RemoteCertificateChainErrors,
Certs = new X509CertificateCollection(new X509Certificate[] { cli })
}
};
// パブリッシャータスク
var pTask = Task.Run(() => new Action<ConnectionFactory, CancellationToken>(async (f, cancel) => {
// コネクション&チャンネル生成
using (var conn = f.CreateConnection())
using (var channel = conn.CreateModel())
{
// Exchange生成
channel.ExchangeDeclare("test", "fanout", false, true);
while (true)
{
// キャンセル待ち
if (cancel.IsCancellationRequested)
{
break;
}
var msg = new SendMessage()
{
Message = "Hello",
Timestamp = DateTime.UtcNow.ToBinary()
};
var body = JsonConvert.SerializeObject(msg);
// Publish!!
try
{
channel.BasicPublish("test", "", null, Encoding.UTF8.GetBytes(body));
Console.WriteLine($"success send. message: {msg.Message}, timestamp: {msg.Timestamp}");
}
catch (Exception ex)
{
Console.WriteLine($"failer send. reason: {ex.Message}");
}
await Task.Delay(10000);
}
}
})(factory, tokenSource.Token), tokenSource.Token);
// コンシューマータスク
var cTask = Task.Run(() => new Action<ConnectionFactory, CancellationToken>((f, cancel) => {
// コネクション&チャンネル生成
using (var conn = f.CreateConnection())
using (var channel = conn.CreateModel())
{
// Exchange生成
channel.ExchangeDeclare("test", "fanout", false, true);
// Queue生成
var queueName = channel.QueueDeclare().QueueName;
// Bind Queue
channel.QueueBind(queueName, "test", "");
// コンシューマー生成
var consumer = new EventingBasicConsumer(channel);
// 受信イベント定義
consumer.Received += (_, ea) =>
{
var msg = JsonConvert.DeserializeObject<ConsumedMessage>(Encoding.UTF8.GetString(ea.Body.ToArray()));
Console.WriteLine($"success consumed. message: {msg.Message}, timestamp: {msg.Timestamp}");
};
// コンシューマー登録
channel.BasicConsume(queueName, true, consumer);
while (true)
{
// キャンセル待ち
if (cancel.IsCancellationRequested)
{
break;
}
}
}
})(factory, tokenSource.Token), tokenSource.Token);
// Ctl+C待機
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
tokenSource.Cancel(); // Taskキャンセル
};
Task.WaitAll(pTask, cTask);
Console.WriteLine("stop .Net RabbitMQ Example. press any key to close.");
Console.ReadKey();
}
}
public class SendMessage
{
public string Message { get; set; }
public long Timestamp { get; set; }
}
public class ConsumedMessage
{
public string Message { get; set; }
public long Timestamp { get; set; }
}
}RabbitMQはサポートしない?
[
{rabbit, [
{ssl_listeners, [5671]},
{ssl_options, [{cacertfile,"c:/certs/ca.crt"},
{certfile,"c:/certs/server.crt"},
{keyfile,"c:/certs/server.key"},
{verify,verify_none},
{fail_if_no_peer_cert,false}]}
]}
].Listening ports amqp/ssl 0.0.0.0 5671 amqp/ssl :: 5671
rabbitmq-plugins enable rabbitmq_auth_mechanism_ssl
[
{rabbit, [
{ssl_listeners, [5671]},
{auth_mechanisms, ['EXTERNAL', 'PLAIN']},
{ssl_cert_login_from, common_name},
{ssl_options, [{cacertfile,"c:/certs/ca.crt"},
{certfile,"c:/certs/server.crt"},
{keyfile,"c:/certs/server.key"},
{verify,verify_peer},
{fail_if_no_peer_cert,true}]}
]}
].Listening ports amqp/ssl 0.0.0.0 5671 amqp/ssl :: 5671
using System.Net.Security; // 追加
...
// ファクトリ生成
var factory = new ConnectionFactory()
{
HostName = hostname,
Ssl = new SslOption // 追加
{
Enabled = true,
ServerName = hostname,
AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch
| SslPolicyErrors.RemoteCertificateChainErrors,
}
};var factory = new ConnectionFactory()
{
HostName = hostname,
UserName = "xxxxx", // 追加
Password = "xxxxx", // 追加using System.Security.Cryptography.X509Certificates; // 追加
...
X509Certificate2 cli = new X509Certificate2(@"C:\certs\client.pfx", "xxxxx"); // 追加
// ファクトリ生成
var factory = new ConnectionFactory()
{
HostName = hostname,
//UserName = "xxxxx", // 削除
//Password = "xxxxx", // 削除
AuthMechanisms = new IAuthMechanismFactory[]{ new ExternalMechanismFactory()}, // 追加
Ssl = new SslOption
{
Enabled = true,
ServerName = hostname,
AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch
| SslPolicyErrors.RemoteCertificateChainErrors,
Certs = new X509CertificateCollection(new X509Certificate[] { cli }) // 追加
}
};