マイクロソフト系技術情報 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。

目次

概要

ここでは、C#で、IoTデバイス → Azure Event Hubs → Azure Databricksと繋いでみる。

詳細

認証・認可

Kafkaエンドポイントの場合は、KafkaのSecurityProtocol?で使用可能なもの。

Event Hubsの構築

  • 取り敢えず、Kafkaエンドポイントを持つEvent Hubsを構築してみる。

Kafkaエンドポイント

  • SKU
    • Basic レベルではサポートされていない。
    • Standard レベルを選択すれば利用可能である模様。
  • 圧縮/メッセージフォーマットは現在サポートしていない。
  • 認証機構としてSASL認証をサポートしている。
    が、その場合(≒IoTデバイスから接続する場合)、
    推奨は、標準の接続文字列(SASトークン)の利用らしい。
  • 追加の制限事項
    • group.id プロパティの最大長は 256 文字
    • offset.metadata.max.bytes の最大サイズは 1024 バイト

IaC化(Azure CLI

  • リソース・グループがない場合、作成
    az group create --name [ResourceGroupName] --location [Location]
    例:az group create --name EventHubsRG --location "Japan East"
  • Event Hubs 名前空間を作成
    az eventhubs namespace create --name [EventHubsNamespace] --resource-group [既存のRG名] -l [Location]
    例:az eventhubs namespace create --name osscjpdevinfra --resource-group EventHubsRG -l "Japan East"
    ※ 省略してある--skuの既定値はStandardになっている。
  • イベント ハブの作成
    az eventhubs eventhub create --name [EventHubsName] --resource-group [既存のRG名] --namespace-name [EventHubs名前空間]
    例:az eventhubs eventhub create --name OsscJpDevInfra --resource-group EventHubsRG --namespace-name osscjpdevinfra

Kafka .NET クライアントで送受信

先ずは、正しく構築できたことを確認するために送受信を行う。

接続文字列の取得

ポータルの「共有アクセス ポリシー」から取得する。

Endpoint=sb://[EventHubsNamespace].servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

※ 内部的には、SASトークンを使用する。

サンプルをクローンして実行

https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/dotnet

  • 設定値
  • 環境に合わせて設定する。
    • EH_FQDN
    • EH_CONNECTION_STRING
  • CA_CERT_LOCATION
    • そのままでも動作する(また、設定しなくても動作する)
    • パラメタとしては、Confluent.KafkaのSslCaLocation?
    • Azure Event HubsのKafkaエンドポイントは既定でSSLらしい。
    • 必要であれが、このパラメタで、認証局の証明書をポイントする。
  • 実行結果の例
    Initializing Producer
    Sending 10 messages to topic: test, broker(s): osscjpdevinfra.servicebus.windows.net:9093
    Message 0 sent (value: 'Sample message #0 sent at 2021-07-23_18:19:22.9101')
    Message 1 sent (value: 'Sample message #1 sent at 2021-07-23_18:19:23.3055')
    Message 2 sent (value: 'Sample message #2 sent at 2021-07-23_18:19:23.3599')
    Message 3 sent (value: 'Sample message #3 sent at 2021-07-23_18:19:23.4080')
    Message 4 sent (value: 'Sample message #4 sent at 2021-07-23_18:19:23.4652')
    Message 5 sent (value: 'Sample message #5 sent at 2021-07-23_18:19:23.5150')
    Message 6 sent (value: 'Sample message #6 sent at 2021-07-23_18:19:23.5784')
    Message 7 sent (value: 'Sample message #7 sent at 2021-07-23_18:19:23.6300')
    Message 8 sent (value: 'Sample message #8 sent at 2021-07-23_18:19:23.6865')
    Message 9 sent (value: 'Sample message #9 sent at 2021-07-23_18:19:23.7695')
    
    Initializing Consumer
    Consuming messages from topic: test, broker(s): osscjpdevinfra.servicebus.windows.net:9093
    Received: 'Sample message #0 sent at 2021-07-23_18:19:22.9101'
    Received: 'Sample message #1 sent at 2021-07-23_18:19:23.3055'
    Received: 'Sample message #2 sent at 2021-07-23_18:19:23.3599'
    Received: 'Sample message #3 sent at 2021-07-23_18:19:23.4080'
    Received: 'Sample message #4 sent at 2021-07-23_18:19:23.4652'
    Received: 'Sample message #5 sent at 2021-07-23_18:19:23.5150'
    Received: 'Sample message #6 sent at 2021-07-23_18:19:23.5784'
    Received: 'Sample message #7 sent at 2021-07-23_18:19:23.6300'
    Received: 'Sample message #8 sent at 2021-07-23_18:19:23.6865'
    Received: 'Sample message #9 sent at 2021-07-23_18:19:23.7695'

.NET for Apache Sparkで送受信

  • 受信処理(DataFrame?ReadStream?)の結果をDataFrame?.Show()で出力する。
  • なお、送信処理(DataFrame?WriteStream?)もサポートしている。

パラメタの初期化(共通)

SASトークンを使用する場合の設定例

string BOOTSTRAP_SERVERS = "hostname:9093"; // 9093 is the port used to communicate with Event Hubs, see [troubleshooting guide](https://docs.microsoft.com/azure/event-hubs/troubleshooting-guide)
string EH_SASL = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"<CONNECTION_STRING>\";"; // Connection string obtained from Step 1

ReadStream?

SparkSession spark = SparkSession
    .Builder()
    .AppName("Connect Event Hub")
    .GetOrCreate();

DataFrame df = spark
    .ReadStream()
    .Format("kafka")
    .Option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .Option("subscribe", "spark-test")
    .Option("kafka.sasl.mechanism", "PLAIN")
    .Option("kafka.security.protocol", "SASL_SSL")
    .Option("kafka.sasl.jaas.config", EH_SASL)
    .Option("kafka.request.timeout.ms", "60000")
    .Option("kafka.session.timeout.ms", "60000")
    .Option("failOnDataLoss", "false")
    .Load();

DataFrame dfWrite = df
    .WriteStream()
    .OutputMode("append")
    .Format("console")
    .Start();

WriteStream?

df.WriteStream()
    .Format("kafka")
    .Option("topic", topics)
    .Option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .Option("kafka.sasl.mechanism", "PLAIN")
    .Option("kafka.security.protocol", "SASL_SSL")
    .Option("kafka.sasl.jaas.config", EH_SASL)
    .Option("checkpointLocation", "./checkpoint")
    .Start();

Azure Databricksにデプロイ

上記の受信処理Azure Databricksにデプロイ

参考

Qiita

https://qiita.com/tags/eventhubs

GitHub

confluent-kafka-dotnet

Azure Event Hubs for Apache Kafka Ecosystems

https://github.com/Azure/azure-event-hubs-for-kafka

Microsoft Docs

クイック スタート

  • イベントの送受信

チュートリアル

操作方法ガイド

ナレコムAzureレシピ

https://azure-recipe.kc-cloud.jp/category/azure-event-hubs/

内部リンク

Apache Kafka

Azure Databricks

.NET for Apache Spark


Tags: :クラウド, :ビッグデータ, :Azure


トップ   編集 凍結 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2021-07-24 (土) 01:13:41 (3d)