マイクロソフト系技術情報 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。

目次

概要

.NET で Spark の実装ができる for Apache Spark。

詳細

Sparkの仕様なので、Azure HDInsightAzure Databricksの差異はない。

ファイルの場合

ローカル

ローカル・ファイルを読むこともできる。

Azure HDInsight

マウントされたストレージに直接アップロードしてソレを読む。

Azure Databricks

Databricks CLIを使用してアップロードしてソレを読む。

WASB or ABFS

下記に対するURLを使用することも出来る。

  • WASB[S]: Azure storage.
    wasb://<container_name>@<storage_account_name>.blob.core.windows.net
  • ABFS[S]: Azure Data Lake Storage Gen2.
    abfs://<file_system>@<account_name>.dfs.core.windows.net

Clientの場合

TCP/IPソケット

.Format("socket")

  • ホスト名
    .Option("host", hostname)
  • ポート番号
    .Option("port", port)

Azure Event Hubs

コチラが参考になる。

Apache Kafka

正確には、Azure Event HubsのKafka エンドポイント

  • 受信処理(DataFrame?ReadStream?)の結果をDataFrame?.Show()で出力する。
  • なお、送信処理(DataFrame?WriteStream?)もサポートしている。
  • 送受信処理の実装
    • パラメタの初期化(共通)
      SASトークンを使用する場合の設定例
      string BOOTSTRAP_SERVERS = "hostname:9093"; // 9093 is the port used to communicate with Event Hubs, see [troubleshooting guide](https://docs.microsoft.com/azure/event-hubs/troubleshooting-guide)
      string EH_SASL = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"<CONNECTION_STRING>\";"; // Connection string obtained from Step 1
  • ReadStream?
    SparkSession spark = SparkSession
        .Builder()
        .AppName("Connect Event Hub")
        .GetOrCreate();
    
    DataFrame df = spark
        .ReadStream()
        .Format("kafka")
        .Option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
        .Option("subscribe", "spark-test")
        .Option("kafka.sasl.mechanism", "PLAIN")
        .Option("kafka.security.protocol", "SASL_SSL")
        .Option("kafka.sasl.jaas.config", EH_SASL)
        .Option("kafka.request.timeout.ms", "60000")
        .Option("kafka.session.timeout.ms", "60000")
        .Option("failOnDataLoss", "false")
        .Load();
    
    DataFrame dfWrite = df
        .WriteStream()
        .OutputMode("append")
        .Format("console")
        .Start();
  • WriteStream?
    df.WriteStream()
        .Format("kafka")
        .Option("topic", topics)
        .Option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
        .Option("kafka.sasl.mechanism", "PLAIN")
        .Option("kafka.security.protocol", "SASL_SSL")
        .Option("kafka.sasl.jaas.config", EH_SASL)
        .Option("checkpointLocation", "./checkpoint")
        .Start();
  • 書換
    // Create initial DataFrame
    string BOOTSTRAP_SERVERS = "xxxx.servicebus.windows.net:9093";
    string CONNECTION_STRING = "Endpoint=sb://xxxx.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxx";
    string EH_SASL = $"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{CONNECTION_STRING}\";";
    DataFrame lines = spark
        .ReadStream()
        .Format("kafka")
        .Option("subscribe", "test")
        .Option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
        .Option("kafka.sasl.mechanism", "PLAIN")
        .Option("kafka.security.protocol", "SASL_SSL")
        .Option("kafka.sasl.jaas.config", EH_SASL)
        .Option("kafka.request.timeout.ms", "60000")
        .Option("kafka.session.timeout.ms", "60000")
        .Option("kafka.group.id", "$Default")
        .Option("failOnDataLoss", "false")
        .Load();
  • 実行
    >spark-submit ^
    --class org.apache.spark.deploy.dotnet.DotnetRunner ^
    --master local ^
    microsoft-spark-3-0_2.12-2.0.0.jar ^
    dotnet mySparkStreamingApp.dll
    ※ エラー発生中(調査中

Serverの場合

サーバーにはならない

参考

microsoft.com

Microsoft Docs > .NET for Apache Spark ガイド


Tags: :クラウド, :Azure, :.NET開発, :.NET Core, :.NET Standard


トップ   編集 凍結 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2021-10-20 (水) 16:14:29 (7d)