「マイクロソフト系技術情報 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。
目次 †
概要 †
Azure Databricksのチュートリアル。
準備 †
契約 †
Databricksの契約 †
- Azureの無料試用版サブスクリプションは利用不可との事。
- ただし、Azureの従量課金制サブスクリプションで
Databricksの14日間の無料試用版を使用可能。
環境 †
ワークスペース †
- ポータルで [リソースの作成] > [分析] > [Azure Databricks] の順に選択
- ワークスペース名
mydatabrickswsとか
- リソース グループ
- DplRGとか
- 既定値は、ワークスペース名に、prefixとしてdatabricks-rg-が付与されたもの。
databricksXXXと入力すると、databricks-rg-databricksXXX-XXXXになってしまう。
- Virtual Network
以下の項目は、共に「いいえ」を選択した。
- Secure Cluster Connectivity による...デプロイ (パブリック IP なし)
- 自分の仮想ネットワーク (VNet) に...デプロイします
- [作成]ボタンを押下
ワークスペースの作成には数分かかる。
- 「デプロイが完了しました」が表示されたら、
- [リソースに移動]ボタンを押下する。
- [ワークスペースの起動]ボタンを押下する。
- Databricks のポータル・サイトに移動する。
- 付与(WSLで)
$ curl -X POST https://<databricks-instance>/api/2.0/ip-access-lists
--header "Authorization: Bearer $DATABRICKS_TOKEN" \
-d '{
"label": "office",
"list_type": "ALLOW",
"ip_addresses": [
"xxx.xxx.xxx.xxx"
]
}'
クラスタ †
手順に従いClusterを作成する
- 入力
- Cluster Name : mysparkcluster
- Cluster Mode : Standard
- Pool : None
- Runtime : 6.4 -> 7.3 LTS
- Autopilot Options
- ☑ Enable autoscaling
- ☑ Terminate after 20 minutes of inactivity
- Worker Type
・Standard_DS3_v2
・Min Workers 2 Max Workers 8
・☑ Spot instances
・Driver Type Same as worker
- [Create Cluster]ボタンを押下
※ この手順では、クォータ制限の問題は発生しなかった(発生した場合はコチラ)。
Notebook †
- 入力
- Name : mynotebook
- Language : Python
- Cluster : mysparkcluster
- Notebook上でインタラクティブ実行してみる。
ツール †
- Databricks CLI
ファイル・システムにアクセスしたりする時に使う。
Scala †
Notebookのセルに記述可能。
ETL のチュートリアル †
参考中の「...データの抽出、変換、読み込みを行う」
Event Hubsと組み合わせるチュートリアル †
参考中の「Event Hubs を使用して...ストリーム配信する」
感情分析のチュートリアル †
参考中の「Azure Databricks を使用したストリーミング データに対する感情分析」
参考 †
Microsoft Docs、Qiita
Notebookのセルに記述可能。
Docsのクイック スタート †
- 参考中のクイック スタート。
- Docsの説明が雑なので以下、注釈を加えてみる。
Parquetファイル †
- CSVやTSVファイルのような行指向ファイル形式
に対し効率的で高性能な列指向ストレージ形式。
- 読込
parquet_df = spark.read.parquet("path/to/userdata1.parquet")
- 表示
parquet_df.show()
- 結果
+-------------------+---+----------+---------+--------------------+------+---------------+-------------------+--------------------+----------+---------+--------------------+--------------------+
| registration_dttm| id|first_name|last_name| email|gender| ip_address| cc| country| birthdate| salary| title| comments|
+-------------------+---+----------+---------+--------------------+------+---------------+-------------------+--------------------+----------+---------+--------------------+--------------------+
|2016-02-03 16:55:29| 1| Amanda| Jordan| ajordan0@com.com|Female| 1.197.201.2| 6759521864920116| Indonesia| 3/8/1971| 49756.53| Internal Auditor| 1E+02|
...
その次に、Azureストレージから読込んでみる。
- 実行
from pyspark.sql import SparkSession
spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()
blob_account_name = "osscjpdevinfra"
blob_container_name = "container1"
blob_relative_path = "userdata1.parquet"
blob_sas_token = r"?st=xxxxxxxxxxxxxxxxxxxx"
wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)
spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name), blob_sas_token)
print('Remote blob path: ' + wasbs_path)
parquet_df = spark.read.parquet(wasbs_path)
parquet_df.show()
ライブラリが足りないので、Azure Databricks上で実行してみる。
- 前提
- ワークスペースの作成(作成のスクリプトはコチラ
- クラスタの作成(作成の手順はコチラ
- Notebookの作成する(作成の手順はコチラ
- 左上のドロップダウン・リストから作成したClusterを選択し、
- そこから[Start Cluster]を選択して押下する。
- Shift + Enter キーを使用してコードを実行
- 無事、実行されたことを確認したら
以下のようにコードを変更して再実行する。
- データの視覚的な表現を作成する。
displayで表示された形式の出力の一番下から、
[Plot Options]をクリックし、以下のようにする。
- Clusterを開始した際に使ったドロップダウン・リストからCluster管理画面へ飛ぶ。
- Cluster管理画面で、Terminate or Deleteを選択する。
- 課金がアレなので、
・PoCならリソース・グループごと削除しておいた方が良い。
・と言うのも、リソース・グループを見るとClusterのリソースが大量に。
クイック スタートも終わったので、PySparkチュートリアルを、Azure Databricksで動かしてみる。
静的ファイルで構造化ストリーミングをエミュレート †
databricks.comのApache Spark チュートリアル(ストリーミング)を、Azure Databricksで動かしてみる。
環境準備 †
作成のスクリプトは以下
エミュレート †
readStreamでinputPathをmaxFilesPerTrigger?で読む...みたいな感じになる。
from pyspark.sql.functions import *
# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (
spark
.readStream
.schema(jsonSchema) # Set the schema of the JSON data
.option("maxFilesPerTrigger", 1) # Treat a sequence of files as a stream by picking one file at a time
.json(inputPath)
)
# Same query as staticInputDF
streamingCountsDF = (
streamingInputDF
.groupBy(
streamingInputDF.action,
window(streamingInputDF.time, "1 hour"))
.count()
)
出力方法 †
インタラクティブにやる場合、
- 「%sql」のマジックコマンドでセレクトする。
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action
みたいな方法になる。
PySparkチュートリアル中の構造化ストリーミングの入力のKafka化ができなかった
(コンテナのJupyter NotebookからコンテナのKafkaに接続できなかった)ので、
Azure Databricksでリトライ。
環境準備 †
作成のスクリプトは以下
送受信 †
- PySparkチュートリアル中の構造化ストリーミングの
入力部分をKafka化したスクリプトを以下に書き出す。
- ただし、Event HubsのKafkaエンドポイント風に書く。
- パラメタは純正クライアントを使用した受信処理を参考にする。
(詳細不明だが、eh_saslの先頭にkafkashadedと言う文字列が必要)
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import window
bootstrapServers = "<eventhubsNameSpace>.servicebus.windows.net:9093"
eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="<primaryConnectionString>";'
windowSize = 10
slideSize = 10
if slideSize > windowSize:
print("<slideSize> must be less than or equal to <windowSize>", file=sys.stderr)
windowDuration = '{} seconds'.format(windowSize)
slideDuration = '{} seconds'.format(slideSize)
spark = SparkSession\
.builder\
.appName("StructuredNetworkWordCountWindowed")\
.getOrCreate()
# Create DataFrame representing the stream of input lines from kafka
lines = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", bootstrapServers) \
.option("subscribe", "test_topic") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", eh_sasl) \
.load()
# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into multiple rows
words = lines.select(
explode(split(lines.value, ' ')).alias('word'),
lines.timestamp
)
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, windowDuration, slideDuration),
words.word
).count().orderBy('window')
# Start running the query that prints the windowed word counts to the console
query = windowedCounts\
.writeStream\
.outputMode('complete')\
.format('console')\
.option('truncate', 'false')\
.start()
query.awaitTermination()
- 構造化ストリーミングでは(?)、
consoleへの出力が確認できなかったので、
インタラクティブにやる場合、
その他 †
KcMichael? - Qiita †
参考中の...
SQL Serverと組み合わせるチュートリアル †
参考中の「...ノートブックから...SQL Server Linux Docker コンテナーのクエリを実行する」
古い? †
ちょっと古いのか?アカンやつ。
Azure Databricksを使ってみた †
参考中の「Azure Databricksを使ってみた」
...ちと古いし、他にも色々アレなので、パス。
参考中の
「チュートリアル:Azure Data Lake Storage Gen2、Azure Databricks、および Spark」
...フライト データのダウンロードができない。
参考 †
Microsoft Docs、Qiita
.NET †
(.NET for Apache Spark)
- ビルドしたモノをZIPして、spark-submitするのでローカル環境構築が必要。
Get started in 10 minutes †
アプリの発行 †
依存関係のダウンロード †
※ URL : https://github.com/dotnet/spark/tree/master/deployment
- install-worker.sh
Apache Spark クラスタに Worker バイナリをインストール
- db-init.sh
- ワーカーとアプリの依存関係を Databricks Spark クラスタに インストール。
- DOTNET_SPARK_RELEASEを修正するという手順があるが、コレは不要らしい。
※ *.shの行の終わりは Unix 形式 (LF) であることを確認
ファイルのアップロード †
次のファイルをアップロードする
- microsoft-spark-2-4_2.11-1.0.0.jar
ローカル実行で使用したもの(publish中で使用したバージョン)
- Microsoft.Spark.Worker
- install-worker.sh
- db-init.sh
ジョブを作成して実行 †
- 以下、UIが変更されているので、以下を参考にする。
- spark-submitの構成
初めに、[Task]の[Type]を[Notebook]から[Spark Submit]に変更する。
- クラスタ構成の設定
次に、Clusterの[Edit]を選択し、Configure New Clusterを表示させる。
- Databricks Runtime Versionを選択
- Spark 2.4.1が無かったので、最も近い、
- Runtime: 5.5 LTS (Scala 2.11, Spark 2.4.3)に変更。
- Initスクリプトを db-init.sh に設定
※ コレにより、db-init.sh 内で install-worker.sh が構成される($1-3をパラメタライズ)。
- パラメタの設定
- パラメタに次の文字列を貼り付ける。
["--class","org.apache.spark.deploy.dotnet.DotnetRunner","/dbfs/spark-dotnet/microsoft-spark-2-4_2.11-1.0.0.jar","/dbfs/spark-dotnet/publish.zip","mySparkApp"]
- mySparkApp? → MySparkApp? としていたので、その様に変更した。
- ...と言う事で、実際、パラメタには、次の文字列を貼り付けた。
["--class","org.apache.spark.deploy.dotnet.DotnetRunner","/dbfs/spark-dotnet/microsoft-spark-2-4_2.11-1.0.0.jar","/dbfs/spark-dotnet/publish.zip","MySparkApp", "input.txt"]
- ジョブのクラスタが作成されるまで数分かかる。
(リソース・グループを確認するとクラスタのリソース増が確認できる)
- Job名の横にある [Run Now] ボタンをクリックする。
- Jobで構成した Spark クラスタでJobが実行される。
- 実行結果は、Completed Runs表中のSpark列に表示されるLogsから確認できる。
- リソースのクリーンアップ
- リソース・グループを確認するとクラスタのリソース減が確認できる
- 必要に応じて、(ワークスペースとクラスタの)リソース・グループを削除する。
参考 †
- Cluster:
Driver: Standard_DS3_v2,
Workers: Standard_DS3_v2,
8 workers, 5.5 LTS (includes Apache Spark 2.4.3, Scala 2.11)
- Message:
Unexpected failure while waiting for the cluster (...) to be ready.
Cause Unexpected state for cluster (...):
AZURE_QUOTA_EXCEEDED_EXCEPTION(CLIENT_ERROR):~
azure_error_code : QuotaExceeded,~
azure_error_message :
Operation could not be completed as it results in exceeding approved Total Regional Cores quota.
Additional details -
Deployment Model: Resource Manager,
Location: japaneast,
Current Limit: 10,
Current Usage: 4,
Additional Required: 28,
(Minimum) New Limit Required: 32.
Submit a request for Quota increase at
https://aka.ms/ProdportalCRP/?#create/Microsoft.Support/Parameters/%7B%22subId%22:%2232e43be6-1930-4982-a62c-a259327e5c77%22,%22pesId%22:%2206bfd9d3-516b-d5c6-5802-169c800dec89%22,%22supportTopicId%22:%22e12e3d1d-7fa0-af33-c6d0-3c50df9658a3%22%7D
by specifying parameters listed in the 'Details' section for deployment to succeed.
Pleas ... ***WARNING: message truncated. Skipped 964 bytes of output**
- クォータ制限を解除する。
以下のクォータを32に設定する。
・Total Regional vCPUsのクォータ
・Standard_DS3_v2 → DSv2 シリーズ(Standard DSv2 Promo Family vCPUs)のクォータ
上記と同様に行ってみる例 †
ワークスペースをクリーンナップするか新規作成する必要があるので、
1つのバッチにまとめて、コマンドライン引数で切り替えるのが良いかも。
- spark-submitのパラメタ設定
["--class","org.apache.spark.deploy.dotnet.DotnetRunner","/dbfs/spark-dotnet/microsoft-spark-2-4_2.11-1.0.0.jar","/dbfs/spark-dotnet/publish.zip","mySparkBatchApp", "projects_smaller.csv"]
- エラー
何故か、最後のDataFrame?.Showでエラーになって
4つ目の結果セットが表示されなかった(原因不明)。
未実施
未実施
Java †
可能らしいがサンプルが無いのは、Scalaでやるから?
... †
参考 †
databricks.com †
Apache Spark チュートリアル: Apache Spark チュートリアルを開始する
microsoft.com †
Scala †
Python (PySpark?) †
Getting Started †
Perform Data Science - Learn †
構造化ストリーミング †
Azure Machine Learning †
Qiita †
Scala †
Python (PySpark?) †
Tags: :クラウド, :ビッグデータ, :Azure