「マイクロソフト系技術情報 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。
目次 †
概要 †
.NET で Spark の実装ができる for Apache Spark。
詳細 †
Sparkの仕様なので、Azure HDInsight・Azure Databricksの差異はない。
ファイルの場合 †
ローカル †
ローカル・ファイルを読むこともできる。
マウントされたストレージに直接アップロードしてソレを読む。
Databricks CLIを使用してアップロードしてソレを読む。
WASB or ABFS †
下記に対するURLを使用することも出来る。
- WASB[S]: Azure storage.
wasb://<container_name>@<storage_account_name>.blob.core.windows.net
- ABFS[S]: Azure Data Lake Storage Gen2.
abfs://<file_system>@<account_name>.dfs.core.windows.net
Clientの場合 †
TCP/IPソケット †
.Format("socket")
- ホスト名
.Option("host", hostname)
- ポート番号
.Option("port", port)
コチラが参考になる。
Apache Kafka †
正確には、Azure Event HubsのKafka エンドポイント
- 受信処理(DataFrame?のReadStream?)の結果をDataFrame?.Show()で出力する。
- なお、送信処理(DataFrame?のWriteStream?)もサポートしている。
- 送受信処理の実装
- パラメタの初期化(共通)
SASトークンを使用する場合の設定例
string BOOTSTRAP_SERVERS = "hostname:9093"; // 9093 is the port used to communicate with Event Hubs, see [troubleshooting guide](https://docs.microsoft.com/azure/event-hubs/troubleshooting-guide)
string EH_SASL = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"<CONNECTION_STRING>\";"; // Connection string obtained from Step 1
- ReadStream?
SparkSession spark = SparkSession
.Builder()
.AppName("Connect Event Hub")
.GetOrCreate();
DataFrame df = spark
.ReadStream()
.Format("kafka")
.Option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.Option("subscribe", "spark-test")
.Option("kafka.sasl.mechanism", "PLAIN")
.Option("kafka.security.protocol", "SASL_SSL")
.Option("kafka.sasl.jaas.config", EH_SASL)
.Option("kafka.request.timeout.ms", "60000")
.Option("kafka.session.timeout.ms", "60000")
.Option("failOnDataLoss", "false")
.Load();
DataFrame dfWrite = df
.WriteStream()
.OutputMode("append")
.Format("console")
.Start();
- WriteStream?
df.WriteStream()
.Format("kafka")
.Option("topic", topics)
.Option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.Option("kafka.sasl.mechanism", "PLAIN")
.Option("kafka.security.protocol", "SASL_SSL")
.Option("kafka.sasl.jaas.config", EH_SASL)
.Option("checkpointLocation", "./checkpoint")
.Start();
- 書換
// Create initial DataFrame
string BOOTSTRAP_SERVERS = "xxxx.servicebus.windows.net:9093";
string CONNECTION_STRING = "Endpoint=sb://xxxx.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxx";
string EH_SASL = $"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{CONNECTION_STRING}\";";
DataFrame lines = spark
.ReadStream()
.Format("kafka")
.Option("subscribe", "test")
.Option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.Option("kafka.sasl.mechanism", "PLAIN")
.Option("kafka.security.protocol", "SASL_SSL")
.Option("kafka.sasl.jaas.config", EH_SASL)
.Option("kafka.request.timeout.ms", "60000")
.Option("kafka.session.timeout.ms", "60000")
.Option("kafka.group.id", "$Default")
.Option("failOnDataLoss", "false")
.Load();
Serverの場合 †
サーバーにはならない
参考 †
microsoft.com †
Tags: :クラウド, :Azure, :.NET開発, :.NET Core, :.NET Standard