「マイクロソフト系技術情報 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。
ここでは、C#で、IoTデバイス → Azure Event Hubs → Azure Databricksと繋いでみる。
az group create --name [ResourceGroupName] --location [Location] 例:az group create --name EventHubsRG --location "Japan East"
az eventhubs namespace create --name [EventHubsNamespace] --resource-group [既存のRG名] -l [Location] 例:az eventhubs namespace create --name osscjpdevinfra --resource-group EventHubsRG -l "Japan East"※ 省略してある--skuの既定値はStandardになっている。
az eventhubs eventhub create --name [EventHubsName] --resource-group [既存のRG名] --namespace-name [EventHubs名前空間] 例:az eventhubs eventhub create --name OsscJpDevInfra --resource-group EventHubsRG --namespace-name osscjpdevinfra
az eventhubs namespace network-rule add --resource-group [ResourceGroupName] --namespace-name [EventHubs名前空間] --ip-address xxx.xxx.xxx.xxx/24 --action Allow 例:az eventhubs namespace network-rule add --resource-group EventHubsRG --namespace-name osscjpdevinfra --ip-address xxx.xxx.xxx.xxx/24 --action
※ データ・パープライン系は「West US 2」辺りが良いかも。
先ずは、正しく構築できたことを確認するために送受信を行う。
ポータルの「共有アクセス ポリシー」から取得する。
Endpoint=sb://[EventHubsNamespace].servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
※ 内部的には、SASトークンを使用する。
https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/dotnet
Initializing Producer Sending 10 messages to topic: test, broker(s): osscjpdevinfra.servicebus.windows.net:9093 Message 0 sent (value: 'Sample message #0 sent at 2021-07-23_18:19:22.9101') Message 1 sent (value: 'Sample message #1 sent at 2021-07-23_18:19:23.3055') Message 2 sent (value: 'Sample message #2 sent at 2021-07-23_18:19:23.3599') Message 3 sent (value: 'Sample message #3 sent at 2021-07-23_18:19:23.4080') Message 4 sent (value: 'Sample message #4 sent at 2021-07-23_18:19:23.4652') Message 5 sent (value: 'Sample message #5 sent at 2021-07-23_18:19:23.5150') Message 6 sent (value: 'Sample message #6 sent at 2021-07-23_18:19:23.5784') Message 7 sent (value: 'Sample message #7 sent at 2021-07-23_18:19:23.6300') Message 8 sent (value: 'Sample message #8 sent at 2021-07-23_18:19:23.6865') Message 9 sent (value: 'Sample message #9 sent at 2021-07-23_18:19:23.7695') Initializing Consumer Consuming messages from topic: test, broker(s): osscjpdevinfra.servicebus.windows.net:9093 Received: 'Sample message #0 sent at 2021-07-23_18:19:22.9101' Received: 'Sample message #1 sent at 2021-07-23_18:19:23.3055' Received: 'Sample message #2 sent at 2021-07-23_18:19:23.3599' Received: 'Sample message #3 sent at 2021-07-23_18:19:23.4080' Received: 'Sample message #4 sent at 2021-07-23_18:19:23.4652' Received: 'Sample message #5 sent at 2021-07-23_18:19:23.5150' Received: 'Sample message #6 sent at 2021-07-23_18:19:23.5784' Received: 'Sample message #7 sent at 2021-07-23_18:19:23.6300' Received: 'Sample message #8 sent at 2021-07-23_18:19:23.6865' Received: 'Sample message #9 sent at 2021-07-23_18:19:23.7695'
string BOOTSTRAP_SERVERS = "hostname:9093"; // 9093 is the port used to communicate with Event Hubs, see [troubleshooting guide](https://docs.microsoft.com/azure/event-hubs/troubleshooting-guide) string EH_SASL = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"<CONNECTION_STRING>\";"; // Connection string obtained from Step 1
SparkSession spark = SparkSession .Builder() .AppName("Connect Event Hub") .GetOrCreate(); DataFrame df = spark .ReadStream() .Format("kafka") .Option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) .Option("subscribe", "spark-test") .Option("kafka.sasl.mechanism", "PLAIN") .Option("kafka.security.protocol", "SASL_SSL") .Option("kafka.sasl.jaas.config", EH_SASL) .Option("kafka.request.timeout.ms", "60000") .Option("kafka.session.timeout.ms", "60000") .Option("failOnDataLoss", "false") .Load(); DataFrame dfWrite = df .WriteStream() .OutputMode("append") .Format("console") .Start();
df.WriteStream() .Format("kafka") .Option("topic", topics) .Option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) .Option("kafka.sasl.mechanism", "PLAIN") .Option("kafka.security.protocol", "SASL_SSL") .Option("kafka.sasl.jaas.config", EH_SASL) .Option("checkpointLocation", "./checkpoint") .Start();
.NET for Apache Sparkの構造化ストリーミングのチュートリアルの受信部を書き換えて実行してみる。
// Create initial DataFrame string BOOTSTRAP_SERVERS = "xxxx.servicebus.windows.net:9093"; string CONNECTION_STRING = "Endpoint=sb://xxxx.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxx"; string EH_SASL = $"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{CONNECTION_STRING}\";"; DataFrame lines = spark .ReadStream() .Format("kafka") .Option("subscribe", "test") .Option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) .Option("kafka.sasl.mechanism", "PLAIN") .Option("kafka.security.protocol", "SASL_SSL") .Option("kafka.sasl.jaas.config", EH_SASL) .Option("kafka.request.timeout.ms", "60000") .Option("kafka.session.timeout.ms", "60000") .Option("kafka.group.id", "$Default") .Option("failOnDataLoss", "false") .Load();
>spark-submit ^ --class org.apache.spark.deploy.dotnet.DotnetRunner ^ --master local ^ microsoft-spark-3-0_2.12-2.0.0.jar ^ dotnet mySparkStreamingApp.dll※ エラー発生中(調査中
上記の受信処理をAzure Databricksにデプロイ
https://qiita.com/tags/eventhubs
https://github.com/Azure/azure-event-hubs-for-kafka
https://azure-recipe.kc-cloud.jp/category/azure-event-hubs/