マイクロソフト系技術情報 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。

目次

概要

ここでは、C#で、IoTデバイス → Azure Event Hubs → Azure Databricksと繋いでみる。

詳細

Event Hubsの構築

  • 取り敢えず、Kafkaエンドポイントを持つEvent Hubsを構築してみる。

Kafkaエンドポイント

  • SKU
    • Basic レベルではサポートされていない。
    • Standard レベルを選択すれば利用可能である模様。
  • マッピング
    Event HubsKafka
    名前空間クラスター
    イベントハブトピック
    PartitionPartition
    コンシューマーグループコンシューマーグループ
    OffsetOffset
  • 圧縮/メッセージフォーマットは現在サポートしていない。
  • 認証機構としてSASL認証をサポートしている。
    が、その場合(≒IoTデバイスから接続する場合)、
    推奨は、標準の接続文字列(SASトークン)の利用らしい。
  • 追加の制限事項
    • group.id プロパティの最大長は 256 文字
    • offset.metadata.max.bytes の最大サイズは 1024 バイト

Event Hubsの作成

  • ポータルから
  • リソース グループを作成
    データ・パープライン系は「West US 2」辺りが良いかも。
  • Event Hubs 名前空間を作成
  • Event Hubs を作成
  • IaC化
  • リソース・グループがない場合、作成
    az group create --name [ResourceGroupName] --location [Location]
    例:az group create --name EventHubsRG --location "Japan East"
  • Event Hubs 名前空間を作成
    az eventhubs namespace create --name [EventHubsNamespace] --resource-group [既存のRG名] -l [Location]
    例:az eventhubs namespace create --name osscjpdevinfra --resource-group EventHubsRG -l "Japan East"
    ※ 省略してある--skuの既定値はStandardになっている。
  • イベント ハブの作成
    az eventhubs eventhub create --name [EventHubsName] --resource-group [既存のRG名] --namespace-name [EventHubs名前空間]
    例:az eventhubs eventhub create --name OsscJpDevInfra --resource-group EventHubsRG --namespace-name osscjpdevinfra
  • IPアドレス制限
    az eventhubs namespace network-rule add --resource-group [ResourceGroupName] --namespace-name [EventHubs名前空間] --ip-address xxx.xxx.xxx.xxx/24 --action Allow
    例:az eventhubs namespace network-rule add --resource-group EventHubsRG --namespace-name osscjpdevinfra --ip-address xxx.xxx.xxx.xxx/24 --action 

純正 Kafkaクライアントで送受信

環境構築

接続文字列の取得

  • primaryConnectionString?を取得する。

コマンドで送受信

WSL2で実行する。

  • 準備
    • 取得
      git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
      cd azure-event-hubs-for-kafka/quickstart/kafka-cli
  • 編集(参考
    password部分にprimaryConnectionString?値を埋める。
    $ dir
    README.md  client_common.properties  jaas.conf
    $ vi jaas.conf
  • 送受信
    kafkaInstallHome=/usr/local/kafka
    export KAFKA_OPTS="-Djava.security.auth.login.config=jaas.conf"
    echo $kafkaInstallHome
    echo $KAFKA_OPTS
  • 受信
    $kafkaInstallHome/bin/kafka-console-consumer.sh --topic test_topic --bootstrap-server $eventhubsNameSpace.servicebus.windows.net:9093 --consumer.config client_common.properties
  • 送信
    $kafkaInstallHome/bin/kafka-console-producer.sh --topic test_topic --broker-list $eventhubsNameSpace.servicebus.windows.net:9093 --producer.config client_common.properties

※ $eventhubsNameSpace?は、コチラで設定した値。

Kafka .NET クライアントで送受信

接続文字列の取得

ポータルの「共有アクセス ポリシー」から取得する。

Endpoint=sb://[EventHubsNamespace].servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

※ 内部的には、SASトークンを使用する。

サンプルをクローンして実行

  • 設定値
  • 環境に合わせて設定する。
    • EH_FQDN
    • EH_CONNECTION_STRING
  • CA_CERT_LOCATION
    • そのままでも動作する(また、設定しなくても動作する)
    • パラメタとしては、Confluent.KafkaのSslCaLocation?
    • Azure Event HubsのKafkaエンドポイントは既定でSSLらしい。
    • 必要であれば、このパラメタで、認証局の証明書をポイントする。
  • 実行結果の例
    Initializing Producer
    Sending 10 messages to topic: test, broker(s): osscjpdevinfra.servicebus.windows.net:9093
    Message 0 sent (value: 'Sample message #0 sent at 2021-07-23_18:19:22.9101')
    Message 1 sent (value: 'Sample message #1 sent at 2021-07-23_18:19:23.3055')
    Message 2 sent (value: 'Sample message #2 sent at 2021-07-23_18:19:23.3599')
    Message 3 sent (value: 'Sample message #3 sent at 2021-07-23_18:19:23.4080')
    Message 4 sent (value: 'Sample message #4 sent at 2021-07-23_18:19:23.4652')
    Message 5 sent (value: 'Sample message #5 sent at 2021-07-23_18:19:23.5150')
    Message 6 sent (value: 'Sample message #6 sent at 2021-07-23_18:19:23.5784')
    Message 7 sent (value: 'Sample message #7 sent at 2021-07-23_18:19:23.6300')
    Message 8 sent (value: 'Sample message #8 sent at 2021-07-23_18:19:23.6865')
    Message 9 sent (value: 'Sample message #9 sent at 2021-07-23_18:19:23.7695')
    
    Initializing Consumer
    Consuming messages from topic: test, broker(s): osscjpdevinfra.servicebus.windows.net:9093
    Received: 'Sample message #0 sent at 2021-07-23_18:19:22.9101'
    Received: 'Sample message #1 sent at 2021-07-23_18:19:23.3055'
    Received: 'Sample message #2 sent at 2021-07-23_18:19:23.3599'
    Received: 'Sample message #3 sent at 2021-07-23_18:19:23.4080'
    Received: 'Sample message #4 sent at 2021-07-23_18:19:23.4652'
    Received: 'Sample message #5 sent at 2021-07-23_18:19:23.5150'
    Received: 'Sample message #6 sent at 2021-07-23_18:19:23.5784'
    Received: 'Sample message #7 sent at 2021-07-23_18:19:23.6300'
    Received: 'Sample message #8 sent at 2021-07-23_18:19:23.6865'
    Received: 'Sample message #9 sent at 2021-07-23_18:19:23.7695'

Apache Spark系

.NET for Apache Sparkで送受信

Scalaで送受信

PySparkで送受信

Azure Databricksから接続

上記の受信処理Azure Databricks上で実行する。

参考

Qiita

https://qiita.com/tags/eventhubs

GitHub

confluent-kafka-dotnet

Azure Event Hubs for Apache Kafka Ecosystems

https://github.com/Azure/azure-event-hubs-for-kafka

Microsoft Docs

クイック スタート

  • イベントの送受信

チュートリアル

操作方法ガイド

  • Kafkaエンドポイント

ナレコムAzureレシピ

https://azure-recipe.kc-cloud.jp/category/azure-event-hubs/

概要とパフォーマンス

触ってみた

内部リンク

Apache Kafka

Azure Databricks

.NET for Apache Spark


Tags: :クラウド, :ビッグデータ, :Azure


トップ   編集 凍結 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2021-10-20 (水) 12:08:39 (4d)