「[[マイクロソフト系技術情報 Wiki>http://techinfoofmicrosofttech.osscons.jp/]]」は、「[[Open棟梁Project>https://github.com/OpenTouryoProject/]]」,「[[OSSコンソーシアム .NET開発基盤部会>https://www.osscons.jp/dotNetDevelopmentInfrastructure/]]」によって運営されています。 -[[戻る>.NET for Apache Spark]] --[[.NET for Apache Sparkチュートリアル]] --.NET for Apache Sparkのデータ接続 --[[.NET for Apache SparkのSQL]] * 目次 [#l6fd2e48] #contents *概要 [#cc875afc] .NET で Spark の実装ができる for Apache Spark。 *詳細 [#x9270354] Sparkの仕様なので、[[Azure HDInsight]]・[[Azure Databricks]]の差異はない。 **ファイルの場合 [#la597a33] ***ローカル [#h77d6861] ローカル・ファイルを読むこともできる。 ***[[Azure HDInsight]] [#k6dae843] [[マウントされたストレージに直接アップロード>Azure HDInsight#z4b8527c]]してソレを読む。 ***[[Azure Databricks]] [#jcd6adda] [[Databricks CLIを使用してアップロード>Azure Databricks#uee92a05]]してソレを読む。 ***WASB or ABFS [#t901d2aa] 下記に対するURLを使用することも出来る。 -WASB[S]: Azure storage.~ wasb://<container_name>@<storage_account_name>.blob.core.windows.net -ABFS[S]: Azure Data Lake Storage Gen2.~ abfs://<file_system>@<account_name>.dfs.core.windows.net **Clientの場合 [#ac6faf6f] -SparkSessionのReadStreamメソッドでDataFrameを読み込む。 -[[LoopではなくStreamingQueryを使用して実装する。>.NET for Apache Sparkチュートリアル#v2288c2e]] ***TCP/IPソケット [#y3026999] .Format("socket") -ホスト名~ .Option("host", hostname) -ポート番号~ .Option("port", port) ***[[Azure Event Hubs]] [#g36a8526] [[コチラ>Azure Databricksチュートリアル#y4d4dbfd]]が参考になる。 ***Apache Kafka [#efb0d878] 正確には、[[Azure Event HubsのKafka エンドポイント>Azure Event Hubsチュートリアル#od841821]] -受信処理(DataFrameのReadStream)の結果をDataFrame.Show()で出力する。 -なお、送信処理(DataFrameのWriteStream)もサポートしている。 -送受信処理の実装 --パラメタの初期化(共通)~ [[SASトークン>#y03d6018]]を使用する場合の設定例 string BOOTSTRAP_SERVERS = "hostname:9093"; // 9093 is the port used to communicate with Event Hubs, see [troubleshooting guide](https://docs.microsoft.com/azure/event-hubs/troubleshooting-guide) string EH_SASL = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"<CONNECTION_STRING>\";"; // Connection string obtained from Step 1 --ReadStream SparkSession spark = SparkSession .Builder() .AppName("Connect Event Hub") .GetOrCreate(); DataFrame df = spark .ReadStream() .Format("kafka") .Option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) .Option("subscribe", "spark-test") .Option("kafka.sasl.mechanism", "PLAIN") .Option("kafka.security.protocol", "SASL_SSL") .Option("kafka.sasl.jaas.config", EH_SASL) .Option("kafka.request.timeout.ms", "60000") .Option("kafka.session.timeout.ms", "60000") .Option("failOnDataLoss", "false") .Load(); DataFrame dfWrite = df .WriteStream() .OutputMode("append") .Format("console") .Start(); --WriteStream df.WriteStream() .Format("kafka") .Option("topic", topics) .Option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) .Option("kafka.sasl.mechanism", "PLAIN") .Option("kafka.security.protocol", "SASL_SSL") .Option("kafka.sasl.jaas.config", EH_SASL) .Option("checkpointLocation", "./checkpoint") .Start(); -チュートリアルの書き換え~ [[.NET for Apache Sparkの構造化ストリーミングのチュートリアル>.NET for Apache Sparkチュートリアル#v2288c2e]]の受信部を書き換えて実行してみる。 --書換 // Create initial DataFrame string BOOTSTRAP_SERVERS = "xxxx.servicebus.windows.net:9093"; string CONNECTION_STRING = "Endpoint=sb://xxxx.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxx"; string EH_SASL = $"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{CONNECTION_STRING}\";"; DataFrame lines = spark .ReadStream() .Format("kafka") .Option("subscribe", "test") .Option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) .Option("kafka.sasl.mechanism", "PLAIN") .Option("kafka.security.protocol", "SASL_SSL") .Option("kafka.sasl.jaas.config", EH_SASL) .Option("kafka.request.timeout.ms", "60000") .Option("kafka.session.timeout.ms", "60000") .Option("kafka.group.id", "$Default") .Option("failOnDataLoss", "false") .Load(); --実行 >spark-submit ^ --class org.apache.spark.deploy.dotnet.DotnetRunner ^ --master local ^ microsoft-spark-3-0_2.12-2.0.0.jar ^ dotnet mySparkStreamingApp.dll ※ エラー発生中(調査中 **Serverの場合 [#g40c913e] サーバーにはならない *参考 [#o8fa57c9] **microsoft.com [#pe74c5ab] ***[[Microsoft Docs > .NET for Apache Spark ガイド>.NET for Apache Spark#w086a731]] [#i96a74c2] -使い方ガイド > データーへの接続 --Azure Data Lake Storage Gen 2 または WASB アカウントに接続する~ https://docs.microsoft.com/ja-jp/dotnet/spark/how-to-guides/connect-to-azure-storage --.NET for Apache Spark を Azure Event Hubs に接続する~ https://docs.microsoft.com/ja-jp/dotnet/spark/how-to-guides/connect-to-event-hub --.NET for Apache Spark を MongoDB に接続する~ https://docs.microsoft.com/ja-jp/dotnet/spark/how-to-guides/connect-to-mongo-db --.NET for Apache Spark を SQL Server に接続する~ https://docs.microsoft.com/ja-jp/dotnet/spark/how-to-guides/connect-to-sql-server ---- Tags: [[:クラウド]], [[:Azure]], [[:.NET開発]], [[:.NET Core]], [[:.NET Standard]]