「[[マイクロソフト系技術情報 Wiki>http://techinfoofmicrosofttech.osscons.jp/]]」は、「[[Open棟梁Project>https://github.com/OpenTouryoProject/]]」,「[[OSSコンソーシアム .NET開発基盤部会>https://www.osscons.jp/dotNetDevelopmentInfrastructure/]]」によって運営されています。

-[[戻る>.NET for Apache Spark]]
--[[.NET for Apache Sparkチュートリアル]]
--.NET for Apache Sparkのデータ接続
--[[.NET for Apache SparkのSQL]]

* 目次 [#l6fd2e48]
#contents

*概要 [#cc875afc]
.NET で Spark の実装ができる for Apache Spark。

*詳細 [#x9270354]
Sparkの仕様なので、[[Azure HDInsight]]・[[Azure Databricks]]の差異はない。

**ファイルの場合 [#la597a33]

***ローカル [#h77d6861]
ローカル・ファイルを読むこともできる。

***[[Azure HDInsight]] [#k6dae843]
[[マウントされたストレージに直接アップロード>Azure HDInsight#z4b8527c]]してソレを読む。

***[[Azure Databricks]] [#jcd6adda]
[[Databricks CLIを使用してアップロード>Azure Databricks#uee92a05]]してソレを読む。

***WASB or ABFS [#t901d2aa]
下記に対するURLを使用することも出来る。

-WASB[S]: Azure storage.~
wasb://<container_name>@<storage_account_name>.blob.core.windows.net

-ABFS[S]: Azure Data Lake Storage Gen2.~
abfs://<file_system>@<account_name>.dfs.core.windows.net

**Clientの場合 [#ac6faf6f]
-SparkSessionのReadStreamメソッドでDataFrameを読み込む。
-[[LoopではなくStreamingQueryを使用して実装する。>.NET for Apache Sparkチュートリアル#v2288c2e]]

***TCP/IPソケット [#y3026999]
.Format("socket")

-ホスト名~
.Option("host", hostname)
-ポート番号~
.Option("port", port)

***[[Azure Event Hubs]] [#g36a8526]
[[コチラ>Azure Databricksチュートリアル#y4d4dbfd]]が参考になる。

***Apache Kafka [#efb0d878]
正確には、[[Azure Event HubsのKafka エンドポイント>Azure Event Hubsチュートリアル#od841821]]

-受信処理(DataFrameのReadStream)の結果をDataFrame.Show()で出力する。

-なお、送信処理(DataFrameのWriteStream)もサポートしている。

-送受信処理の実装
--パラメタの初期化(共通)~
[[SASトークン>#y03d6018]]を使用する場合の設定例
 string BOOTSTRAP_SERVERS = "hostname:9093"; // 9093 is the port used to communicate with Event Hubs, see [troubleshooting guide](https://docs.microsoft.com/azure/event-hubs/troubleshooting-guide)
 string EH_SASL = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"<CONNECTION_STRING>\";"; // Connection string obtained from Step 1

--ReadStream
 SparkSession spark = SparkSession
     .Builder()
     .AppName("Connect Event Hub")
     .GetOrCreate();
 
 DataFrame df = spark
     .ReadStream()
     .Format("kafka")
     .Option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
     .Option("subscribe", "spark-test")
     .Option("kafka.sasl.mechanism", "PLAIN")
     .Option("kafka.security.protocol", "SASL_SSL")
     .Option("kafka.sasl.jaas.config", EH_SASL)
     .Option("kafka.request.timeout.ms", "60000")
     .Option("kafka.session.timeout.ms", "60000")
     .Option("failOnDataLoss", "false")
     .Load();
 
 DataFrame dfWrite = df
     .WriteStream()
     .OutputMode("append")
     .Format("console")
     .Start();

--WriteStream
 df.WriteStream()
     .Format("kafka")
     .Option("topic", topics)
     .Option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
     .Option("kafka.sasl.mechanism", "PLAIN")
     .Option("kafka.security.protocol", "SASL_SSL")
     .Option("kafka.sasl.jaas.config", EH_SASL)
     .Option("checkpointLocation", "./checkpoint")
     .Start();

-チュートリアルの書き換え~
[[.NET for Apache Sparkの構造化ストリーミングのチュートリアル>.NET for Apache Sparkチュートリアル#v2288c2e]]の受信部を書き換えて実行してみる。

--書換
 // Create initial DataFrame
 string BOOTSTRAP_SERVERS = "xxxx.servicebus.windows.net:9093";
 string CONNECTION_STRING = "Endpoint=sb://xxxx.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxx";
 string EH_SASL = $"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{CONNECTION_STRING}\";";
 DataFrame lines = spark
     .ReadStream()
     .Format("kafka")
     .Option("subscribe", "test")
     .Option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
     .Option("kafka.sasl.mechanism", "PLAIN")
     .Option("kafka.security.protocol", "SASL_SSL")
     .Option("kafka.sasl.jaas.config", EH_SASL)
     .Option("kafka.request.timeout.ms", "60000")
     .Option("kafka.session.timeout.ms", "60000")
     .Option("kafka.group.id", "$Default")
     .Option("failOnDataLoss", "false")
     .Load();

--実行
 >spark-submit ^
 --class org.apache.spark.deploy.dotnet.DotnetRunner ^
 --master local ^
 microsoft-spark-3-0_2.12-2.0.0.jar ^
 dotnet mySparkStreamingApp.dll
※ エラー発生中(調査中

**Serverの場合 [#g40c913e]
サーバーにはならない

*参考 [#o8fa57c9]
**microsoft.com [#pe74c5ab]
***[[Microsoft Docs > .NET for Apache Spark ガイド>.NET for Apache Spark#w086a731]] [#i96a74c2]
-使い方ガイド > データーへの接続
--Azure Data Lake Storage Gen 2 または WASB アカウントに接続する~
https://docs.microsoft.com/ja-jp/dotnet/spark/how-to-guides/connect-to-azure-storage
--.NET for Apache Spark を Azure Event Hubs に接続する~
https://docs.microsoft.com/ja-jp/dotnet/spark/how-to-guides/connect-to-event-hub
--.NET for Apache Spark を MongoDB に接続する~
https://docs.microsoft.com/ja-jp/dotnet/spark/how-to-guides/connect-to-mongo-db
--.NET for Apache Spark を SQL Server に接続する~
https://docs.microsoft.com/ja-jp/dotnet/spark/how-to-guides/connect-to-sql-server

----
Tags: [[:クラウド]], [[:Azure]], [[:.NET開発]], [[:.NET Core]], [[:.NET Standard]]

トップ   編集 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS