「マイクロソフト系技術情報 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。
Azure Databricksのチュートリアル。
既存のサブスクリプションで試用版を使用する場合、
ワークスペース作成の価格レベルで試用版を選択する。
<databricks-instance> = adb-<workspace-id>.<random-number>.azuredatabricks.net
$ export DATABRICKS_TOKEN=xxxxxx
$ curl -X PATCH https://<databricks-instance>/api/2.0/workspace-conf \ --header "Authorization: Bearer $DATABRICKS_TOKEN" \ -d '{ "enableIpAccessLists": "true" }'
$ curl -X POST https://<databricks-instance>/api/2.0/ip-access-lists --header "Authorization: Bearer $DATABRICKS_TOKEN" \ -d '{ "label": "office", "list_type": "ALLOW", "ip_addresses": [ "xxx.xxx.xxx.xxx" ] }'
>Connect-AzAccount >Set-AzContext -SubscriptionId ... >Register-AzResourceProvider -ProviderNamespace Microsoft.Databricks >New-AzDatabricksWorkspace -Name [名称] -ResourceGroupName [既存のRG名] -Location [場所(リージョン)] -ManagedResourceGroupName [ManagedRG名] -Sku [sku]
手順に従いClusterを作成する
>pip3 install databricks-cli
>databricks Usage: databricks [OPTIONS] COMMAND [ARGS]... Options: -v, --version 0.14.3 ...
>databricks configure --token
Databricks Host (should begin with https://): https://<Location>.azuredatabricks.net
Token: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
>databricks configure --token Databricks Host (should begin with https://): https://.....azuredatabricks.net/ Token: >
Notebookのセルに記述可能。
参考中の「...データの抽出、変換、読み込みを行う」
参考中の「Event Hubs を使用して...ストリーム配信する」
参考中の「Azure Databricks を使用したストリーミング データに対する感情分析」
Notebookのセルに記述可能。
parquet_df = spark.read.parquet("path/to/userdata1.parquet")
parquet_df.show()
+-------------------+---+----------+---------+--------------------+------+---------------+-------------------+--------------------+----------+---------+--------------------+--------------------+ | registration_dttm| id|first_name|last_name| email|gender| ip_address| cc| country| birthdate| salary| title| comments| +-------------------+---+----------+---------+--------------------+------+---------------+-------------------+--------------------+----------+---------+--------------------+--------------------+ |2016-02-03 16:55:29| 1| Amanda| Jordan| ajordan0@com.com|Female| 1.197.201.2| 6759521864920116| Indonesia| 3/8/1971| 49756.53| Internal Auditor| 1E+02| ...
parquet_df.createOrReplaceTempView('source') parquet_df = spark.sql('SELECT * FROM source LIMIT 10')
print('Displaying top 10 rows: ') parquet_df.show()
Displaying top 10 rows: 上記が10行に絞られた結果
その次に、Azureストレージから読込んでみる。
from pyspark.sql import SparkSession spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate() blob_account_name = "osscjpdevinfra" blob_container_name = "container1" blob_relative_path = "userdata1.parquet" blob_sas_token = r"?st=xxxxxxxxxxxxxxxxxxxx" wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path) spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name), blob_sas_token) print('Remote blob path: ' + wasbs_path) parquet_df = spark.read.parquet(wasbs_path) parquet_df.show()
Class org.apache.hadoop.fs.azure.NativeAzureFileSystem$Secure not found
ライブラリが足りないので、Azure Databricks上で実行してみる。
parquet_df.show()
display(parquet_df)
クイック スタートも終わったので、PySparkチュートリアルを、Azure Databricksで動かしてみる。
databricks.comのApache Spark チュートリアル(ストリーミング)を、Azure Databricksで動かしてみる。
作成のスクリプトは以下
readStreamでinputPathをmaxFilesPerTrigger?で読む...みたいな感じになる。
from pyspark.sql.functions import * # Similar to definition of staticInputDF above, just using `readStream` instead of `read` streamingInputDF = ( spark .readStream .schema(jsonSchema) # Set the schema of the JSON data .option("maxFilesPerTrigger", 1) # Treat a sequence of files as a stream by picking one file at a time .json(inputPath) ) # Same query as staticInputDF streamingCountsDF = ( streamingInputDF .groupBy( streamingInputDF.action, window(streamingInputDF.time, "1 hour")) .count() )
インタラクティブにやる場合、
query = ( streamingCountsDF .writeStream .format("memory") # memory = store in-memory table .queryName("counts") # counts = name of the in-memory table .outputMode("complete") # complete = all the counts should be in the table .start() )
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action
みたいな方法になる。
PySparkチュートリアル中の構造化ストリーミングの入力のKafka化ができなかった
(コンテナのJupyter NotebookからコンテナのKafkaに接続できなかった)ので、
Azure Databricksでリトライ。
作成のスクリプトは以下
import sys from pyspark.sql import SparkSession from pyspark.sql.functions import explode from pyspark.sql.functions import split from pyspark.sql.functions import window bootstrapServers = "<eventhubsNameSpace>.servicebus.windows.net:9093" eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="<primaryConnectionString>";' windowSize = 10 slideSize = 10 if slideSize > windowSize: print("<slideSize> must be less than or equal to <windowSize>", file=sys.stderr) windowDuration = '{} seconds'.format(windowSize) slideDuration = '{} seconds'.format(slideSize) spark = SparkSession\ .builder\ .appName("StructuredNetworkWordCountWindowed")\ .getOrCreate() # Create DataFrame representing the stream of input lines from kafka lines = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", bootstrapServers) \ .option("subscribe", "test_topic") \ .option("kafka.sasl.mechanism", "PLAIN") \ .option("kafka.security.protocol", "SASL_SSL") \ .option("kafka.sasl.jaas.config", eh_sasl) \ .load() # Split the lines into words, retaining timestamps # split() splits each line into an array, and explode() turns the array into multiple rows words = lines.select( explode(split(lines.value, ' ')).alias('word'), lines.timestamp ) # Group the data by window and word and compute the count of each group windowedCounts = words.groupBy( window(words.timestamp, windowDuration, slideDuration), words.word ).count().orderBy('window') # Start running the query that prints the windowed word counts to the console query = windowedCounts\ .writeStream\ .outputMode('complete')\ .format('console')\ .option('truncate', 'false')\ .start() query.awaitTermination()
# Start running the query that prints the windowed word counts to the console query = windowedCounts\ .writeStream\ .format("memory")\ .queryName("counts")\ .outputMode("complete")\ .start() # query.awaitTermination()
%sql select * from counts
参考中の...
参考中の「...ノートブックから...SQL Server Linux Docker コンテナーのクエリを実行する」
ちょっと古いのか?アカンやつ。
参考中の「Azure Databricksを使ってみた」
...ちと古いし、他にも色々アレなので、パス。
参考中の
「チュートリアル:Azure Data Lake Storage Gen2、Azure Databricks、および Spark」
...フライト データのダウンロードができない。
(.NET for Apache Spark)
>dotnet publish -c Release -f netcoreapp3.1 -r ubuntu.16.04-x64
powershell compress-archive publish ..\publish.zip
※ URL : https://github.com/dotnet/spark/tree/master/deployment
※ *.shの行の終わりは Unix 形式 (LF) であることを確認
次のファイルをアップロードする
Microsoft.Spark.JvmException: org.apache.spark.sql.AnalysisException: Path does not exist: dbfs:/input.txt;
databricks fs cp input.txt dbfs:/input.txt databricks fs cp publish.zip dbfs:/spark-dotnet/publish.zip databricks fs cp microsoft-spark-2-4_2.11-1.0.0.jar dbfs:/spark-dotnet/microsoft-spark-2-4_2.11-1.0.0.jar※ input.txtがZIPに同梱されていても、カレント・ディレクトリが異なるのでアップロードが必要。
databricks fs cp Microsoft.Spark.Worker.netcoreapp3.1.linux-x64-1.0.0.tar.gz dbfs:/spark-dotnet/Microsoft.Spark.Worker.netcoreapp3.1.linux-x64-1.0.0.tar.gz databricks fs cp install-worker.sh dbfs:/spark-dotnet/install-worker.sh databricks fs cp db-init.sh dbfs:/spark-dotnet/db-init.sh
dbfs:/spark-dotnet/db-init.sh
※ コレにより、db-init.sh 内で install-worker.sh が構成される($1-3をパラメタライズ)。
["--class","org.apache.spark.deploy.dotnet.DotnetRunner","/dbfs/spark-dotnet/microsoft-spark-2-4_2.11-1.0.0.jar","/dbfs/spark-dotnet/publish.zip","mySparkApp"]
.NET for Apache Spark ジョブを Databricks に送信する | Microsoft Docs
https://docs.microsoft.com/ja-jp/dotnet/spark/how-to-guides/databricks-deploy-methods
["--class","org.apache.spark.deploy.dotnet.DotnetRunner","/dbfs/spark-dotnet/microsoft-spark-2-4_2.11-1.0.0.jar","/dbfs/spark-dotnet/publish.zip","MySparkApp", "input.txt"]
Driver: Standard_DS3_v2, Workers: Standard_DS3_v2, 8 workers, 5.5 LTS (includes Apache Spark 2.4.3, Scala 2.11)
Unexpected failure while waiting for the cluster (...) to be ready. Cause Unexpected state for cluster (...): AZURE_QUOTA_EXCEEDED_EXCEPTION(CLIENT_ERROR):~ azure_error_code : QuotaExceeded,~ azure_error_message : Operation could not be completed as it results in exceeding approved Total Regional Cores quota. Additional details - Deployment Model: Resource Manager, Location: japaneast, Current Limit: 10, Current Usage: 4, Additional Required: 28, (Minimum) New Limit Required: 32. Submit a request for Quota increase at https://aka.ms/ProdportalCRP/?#create/Microsoft.Support/Parameters/%7B%22subId%22:%2232e43be6-1930-4982-a62c-a259327e5c77%22,%22pesId%22:%2206bfd9d3-516b-d5c6-5802-169c800dec89%22,%22supportTopicId%22:%22e12e3d1d-7fa0-af33-c6d0-3c50df9658a3%22%7D by specifying parameters listed in the 'Details' section for deployment to succeed. Pleas ... ***WARNING: message truncated. Skipped 964 bytes of output**
ワークスペースをクリーンナップするか新規作成する必要があるので、
1つのバッチにまとめて、コマンドライン引数で切り替えるのが良いかも。
cd ...\DotNet4ApacheSpark\mySparkBatchApp\mySparkBatchApp dotnet publish -c Release -f netcoreapp3.1 -r ubuntu.16.04-x64 cd ...\mySparkBatchApp\bin\Release\netcoreapp3.1\ubuntu.16.04-x64 powershell compress-archive publish ..\publish.zip cd .. databricks fs rm dbfs:/spark-dotnet/publish.zip databricks fs cp publish.zip dbfs:/spark-dotnet/publish.zip
databricks fs cp projects_smaller.csv dbfs:/projects_smaller.csv
["--class","org.apache.spark.deploy.dotnet.DotnetRunner","/dbfs/spark-dotnet/microsoft-spark-2-4_2.11-1.0.0.jar","/dbfs/spark-dotnet/publish.zip","mySparkBatchApp", "projects_smaller.csv"]
未実施
未実施
可能らしいがサンプルが無いのは、Scalaでやるから?
Apache Spark チュートリアル: Apache Spark チュートリアルを開始する