マイクロソフト系技術情報 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。

目次

概要

ここでは、C#で、IoTデバイス → Azure Event Hubs → Azure Databricksと繋いでみる。

詳細

認証・認可

Kafkaエンドポイントの場合は、KafkaのSecurityProtocol?で使用可能なもの。

Event Hubsの構築

Kafkaエンドポイント

IaC化(Azure CLI

Kafka .NET クライアントで送受信

先ずは、正しく構築できたことを確認するために送受信を行う。

接続文字列の取得

ポータルの「共有アクセス ポリシー」から取得する。

Endpoint=sb://[EventHubsNamespace].servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

※ 内部的には、SASトークンを使用する。

サンプルをクローンして実行

https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/quickstart/dotnet

.NET for Apache Sparkで送受信

パラメタの初期化(共通)

SASトークンを使用する場合の設定例

string BOOTSTRAP_SERVERS = "hostname:9093"; // 9093 is the port used to communicate with Event Hubs, see [troubleshooting guide](https://docs.microsoft.com/azure/event-hubs/troubleshooting-guide)
string EH_SASL = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"<CONNECTION_STRING>\";"; // Connection string obtained from Step 1

ReadStream?

SparkSession spark = SparkSession
    .Builder()
    .AppName("Connect Event Hub")
    .GetOrCreate();

DataFrame df = spark
    .ReadStream()
    .Format("kafka")
    .Option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .Option("subscribe", "spark-test")
    .Option("kafka.sasl.mechanism", "PLAIN")
    .Option("kafka.security.protocol", "SASL_SSL")
    .Option("kafka.sasl.jaas.config", EH_SASL)
    .Option("kafka.request.timeout.ms", "60000")
    .Option("kafka.session.timeout.ms", "60000")
    .Option("failOnDataLoss", "false")
    .Load();

DataFrame dfWrite = df
    .WriteStream()
    .OutputMode("append")
    .Format("console")
    .Start();

WriteStream?

df.WriteStream()
    .Format("kafka")
    .Option("topic", topics)
    .Option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .Option("kafka.sasl.mechanism", "PLAIN")
    .Option("kafka.security.protocol", "SASL_SSL")
    .Option("kafka.sasl.jaas.config", EH_SASL)
    .Option("checkpointLocation", "./checkpoint")
    .Start();

Azure Databricksにデプロイ

上記の受信処理Azure Databricksにデプロイ

参考

Qiita

https://qiita.com/tags/eventhubs

GitHub

confluent-kafka-dotnet

Azure Event Hubs for Apache Kafka Ecosystems

https://github.com/Azure/azure-event-hubs-for-kafka

Microsoft Docs

クイック スタート

チュートリアル

操作方法ガイド

ナレコムAzureレシピ

https://azure-recipe.kc-cloud.jp/category/azure-event-hubs/

内部リンク

Apache Kafka

Azure Databricks

.NET for Apache Spark


Tags: :クラウド, :ビッグデータ, :Azure


トップ   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS