マイクロソフト系技術情報 Wiki」は、「Open棟梁Project」,「OSSコンソーシアム .NET開発基盤部会」によって運営されています。

目次

概要

Azure Databricksのチュートリアル。

準備

契約

Azureの契約

Databricksの契約

  • Azureの無料試用版サブスクリプションは利用不可との事。
  • ただし、
  • Azureの従量課金制サブスクリプションで
    Databricksの14日間の無料試用版を使用可能。
  • ただし、
  • Databricksユニットには課金されないが、
  • クラスタのVMには課金されるので注意する。

環境

ワークスペース

  • 作成方法
  • ポータルで [リソースの作成] > [分析] > [Azure Databricks] の順に選択
  • 値を指定
  • ワークスペース名
  • サブスクリプション
    任意の値
  • リソース グループ
    既定値は、ワークスペース名に、prefixとしてdatabricks-rg-が付与されたもの。
    databricksXXXと入力すると、databricks-rg-databricksXXX-XXXXになってしまう。
  • 価格レベル
    試用版を選択できる。
  • Virtual Network
    以下の項目は、共に「いいえ」を選択した。
    • Secure Cluster Connectivity による...デプロイ (パブリック IP なし)
    • 自分の仮想ネットワーク (VNet) に...デプロイします
  • ワークスペースの作成には数分かかる。
    • 「デプロイが完了しました」が表示されたら、
    • [リソースに移動]ボタンを押下する。
    • [ワークスペースの起動]ボタンを押下する。
    • Databricks のポータル・サイトに移動する。
  • IPアドレス制限
    プレミアム・ライセンスが必要
  • エンドポイント
    <databricks-instance> = adb-<workspace-id>.<random-number>.azuredatabricks.net
  • トークンの設定
    $ export DATABRICKS_TOKEN=xxxxxx
  • 有効化(WSLで)
    $ curl -X PATCH https://<databricks-instance>/api/2.0/workspace-conf \
    --header "Authorization: Bearer $DATABRICKS_TOKEN" \
    -d '{
      "enableIpAccessLists": "true"
    }'
  • 付与(WSLで)
    $ curl -X POST https://<databricks-instance>/api/2.0/ip-access-lists
    --header "Authorization: Bearer $DATABRICKS_TOKEN" \
    -d '{
      "label": "office",
      "list_type": "ALLOW",
      "ip_addresses": [
        "xxx.xxx.xxx.xxx"
      ]
    }'
  • IaC化
  • Azure CLI
    • 現時点でクイック スタート情報なし。

クラスタ

手順に従いClusterを作成する

  • [New Cluster]を押下
  • 入力
    • Cluster Name : mysparkcluster
    • Cluster Mode : Standard
    • Pool : None
    • Runtime : 6.4 -> 7.3 LTS
    • Autopilot Options
      • ☑ Enable autoscaling
      • ☑ Terminate after 20 minutes of inactivity
      • Worker Type
        ・Standard_DS3_v2
        ・Min Workers 2 Max Workers 8
        ・☑ Spot instances
        ・Driver Type Same as worker
  • [Create Cluster]ボタンを押下
    ※ この手順では、クォータ制限の問題は発生しなかった。

ツール

  • Databricks CLI
    ファイル・システムにアクセスしたりする時に使う。
  • Python 3.6 以降が必要
  • Databricks CLIのインストール
    pip3を使用してインストール
    >pip3 install databricks-cli
  • インストールの確認
    >databricks
    Usage: databricks [OPTIONS] COMMAND [ARGS]...
    
    Options:
      -v, --version   0.14.3
      ...
  • Databricks CLIの設定
    >databricks configure --token
  • ホスト URLの入力
    Databricks Host (should begin with https://): https://<Location>.azuredatabricks.net
  • トークンの取得
    トークンの取得
  • トークンの入力
    Token: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
  • 以下のようになる(Tokenの所は入力が表示されない)。
    >databricks configure --token
    Databricks Host (should begin with https://): https://.....azuredatabricks.net/
    Token:
    
    >

Scala

Notebookのセルに記述可能。

ETL のチュートリアル

参考中の「...データの抽出、変換、読み込みを行う」

Event Hubsと組み合わせるチュートリアル

参考中の「Event Hubs を使用して...ストリーム配信する」

感情分析のチュートリアル

参考中の「Azure Databricks を使用したストリーミング データに対する感情分析」

参考

Microsoft DocsQiita

Python (PySpark)

Notebookのセルに記述可能。

Docsのクイック スタート

  • 参考中のクイック スタート。
  • Docsの説明が雑なので以下、注釈を加えてみる。

Parquetファイル

  • CSVやTSVファイルのような行指向ファイル形式
    に対し効率的で高性能な列指向ストレージ形式。
  • 読込
    parquet_df = spark.read.parquet("path/to/userdata1.parquet")
  • 表示
    parquet_df.show()
  • 結果
    +-------------------+---+----------+---------+--------------------+------+---------------+-------------------+--------------------+----------+---------+--------------------+--------------------+
    |  registration_dttm| id|first_name|last_name|               email|gender|     ip_address|                 cc|             country| birthdate|   salary|               title|            comments|
    +-------------------+---+----------+---------+--------------------+------+---------------+-------------------+--------------------+----------+---------+--------------------+--------------------+
    |2016-02-03 16:55:29|  1|    Amanda|   Jordan|    ajordan0@com.com|Female|    1.197.201.2|   6759521864920116|           Indonesia|  3/8/1971| 49756.53|    Internal Auditor|               1E+02|
    ...
  • 一応、Spark SQLを実行してみる。
  • Spark SQL
    parquet_df.createOrReplaceTempView('source')
    parquet_df = spark.sql('SELECT * FROM source LIMIT 10')
  • 表示
    print('Displaying top 10 rows: ')
    parquet_df.show()
  • 結果
    Displaying top 10 rows: 
    上記が10行に絞られた結果

Azureストレージへアクセス

その次に、Azureストレージから読込んでみる。

  • 実行
    from pyspark.sql import SparkSession
    spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()
    
    blob_account_name = "osscjpdevinfra"
    blob_container_name = "container1"
    blob_relative_path = "userdata1.parquet"
    blob_sas_token = r"?st=xxxxxxxxxxxxxxxxxxxx"
    
    wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)
    spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name), blob_sas_token)
    
    print('Remote blob path: ' + wasbs_path)
    
    parquet_df = spark.read.parquet(wasbs_path)
    parquet_df.show()
  • 結果
    ライブラリが足りないもよう。
    Class org.apache.hadoop.fs.azure.NativeAzureFileSystem$Secure not found

Azure Databricks上で実行

ライブラリが足りないので、Azure Databricks上で実行してみる。

  • ワークスペースの作成(作成のスクリプトはコチラ
  • クラスタの作成(作成の手順はコチラ
  • 以下の手順に従いNotebookを作成する
  • [New Notebook]を押下
  • 入力
    • Name : mynotebook
    • Language : Python
    • Cluster : mysparkcluster
  • [Create]ボタンを押下
  • 先程のコードを実行する。
  • Clusterを開始する。
  • 左上のドロップダウン・リストから作成したClusterを選択し、
  • そこから[Start Cluster]を選択して押下する。
  • 先程のコードをセルに貼り付ける。
  • Shift + Enter キーを使用してコードを実行
  • 無事、実行されたことを確認したら 以下のようにコードを変更して再実行する。
  • 変更前
    parquet_df.show()
  • 変更後
    display(parquet_df)
  • データの視覚的な表現を作成する。
    displayで表示された形式の出力の一番下から、
    [Plot Options]をクリックし、以下のようにする。
    グラフ表示
  • Clusterを停止する。
  • Clusterを開始した際に使ったドロップダウン・リストからCluster管理画面へ飛ぶ。
  • Cluster管理画面で、Terminate or Deleteを選択する。
  • 課金がアレなので、
    ・PoCならリソース・グループごと削除しておいた方が良い。
    ・と言うのも、リソース・グループを見るとClusterのリソースが大量に。

PySparkチュートリアル

クイック スタートも終わったので、PySparkチュートリアルを、Azure Databricksで動かしてみる。

on Jupyter Notebook on Docker

DataFrameに対する様々な操作

EventHubsのKafkaと構造化ストリーミング

PySparkチュートリアル中の構造化ストリーミング入力のKafka化ができなかったので、Azure Databricksでやる。

環境準備

作成のスクリプトは以下

送受信

  • Azure Databricksで受信する。
  • PySparkチュートリアル中の構造化ストリーミング
    入力部分をKafka化したスクリプトを以下に書き出す。
    • ただし、Event HubsのKafkaエンドポイント風に書く。
    • パラメタは純正クライアントを使用した受信処理を参考にする。
      (詳細不明だが、eh_saslの先頭にkafkashadedと言う文字列が必要)
      import sys
      
      from pyspark.sql import SparkSession
      from pyspark.sql.functions import explode
      from pyspark.sql.functions import split
      from pyspark.sql.functions import window
      
      bootstrapServers = "<eventhubsNameSpace>.servicebus.windows.net:9093"
      eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="<primaryConnectionString>";'
      
      windowSize = 10
      slideSize  = 10
      if slideSize > windowSize:
          print("<slideSize> must be less than or equal to <windowSize>", file=sys.stderr)
      windowDuration = '{} seconds'.format(windowSize)
      slideDuration = '{} seconds'.format(slideSize)
      
      spark = SparkSession\
          .builder\
          .appName("StructuredNetworkWordCountWindowed")\
          .getOrCreate()
      
      # Create DataFrame representing the stream of input lines from kafka
      lines = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", bootstrapServers) \
        .option("subscribe", "test_topic") \
        .option("kafka.sasl.mechanism", "PLAIN") \
        .option("kafka.security.protocol", "SASL_SSL") \
        .option("kafka.sasl.jaas.config", eh_sasl) \
        .load()
      
      # Split the lines into words, retaining timestamps
      # split() splits each line into an array, and explode() turns the array into multiple rows
      words = lines.select(
          explode(split(lines.value, ' ')).alias('word'),
          lines.timestamp
      )
      
      # Group the data by window and word and compute the count of each group
      windowedCounts = words.groupBy(
          window(words.timestamp, windowDuration, slideDuration),
          words.word
      ).count().orderBy('window')
      
      # Start running the query that prints the windowed word counts to the console
      query = windowedCounts\
          .writeStream\
          .outputMode('complete')\
          .format('console')\
          .option('truncate', 'false')\
          .start()
      
      query.awaitTermination()

writeStream.outputMode('complete')だと標準出力の出力って確認できない?
...と思い、ncコマンドでも試してみたが、やはり出力を確認できなかった。

その他

KcMichael? - Qiita

参考中の...

SQL Serverと組み合わせるチュートリアル

参考中の「...ノートブックから...SQL Server Linux Docker コンテナーのクエリを実行する」

古い?

ちょっと古いのか?アカンやつ。

Azure Databricksを使ってみた

参考中の「Azure Databricksを使ってみた」

...ちと古いし、他にも色々アレなので、パス。

Azure Data Lake のチュートリアル

参考中の

「チュートリアル:Azure Data Lake Storage Gen2、Azure Databricks、および Spark」

...フライト データのダウンロードができない。

参考

Microsoft DocsQiita

.NET

(.NET for Apache Spark)

  • ビルドしたモノをZIPして、spark-submitするのでローカル環境構築が必要。

Get started in 10 minutes

  • 実際に発行&アップロード&実行してみる。

アプリの発行

  • ターゲットはubuntu
    >dotnet publish -c Release -f netcoreapp3.1 -r ubuntu.16.04-x64
  • publishしたものをpublish.zipにまとめる。
    Windowsなので、PowerShellを使用してみる。
    powershell compress-archive publish ..\publish.zip

依存関係のダウンロード

  • 依存関係ファイルをダウンロードする。

※ URL : https://github.com/dotnet/spark/tree/master/deployment

  • install-worker.sh
    Apache Spark クラスタに Worker バイナリをインストール
  • db-init.sh
    • ワーカーとアプリの依存関係を Databricks Spark クラスタに インストール。
    • DOTNET_SPARK_RELEASEを修正するという手順があるが、コレは不要らしい。

※ *.shの行の終わりは Unix 形式 (LF) であることを確認

ファイルのアップロード

次のファイルをアップロードする

  • アプリケーション
  • 一式
  • publish.zip
    発行したアプリ
  • input.txt
    自分は、input.txtをプロジェクト出力に含めていたのでpublishに同梱されているが、
    カレント・ディレクトリが異なるため別途アップロードが必要らしい(根拠は以下のエラー・メッセージ)。
    Microsoft.Spark.JvmException: org.apache.spark.sql.AnalysisException: Path does not exist: dbfs:/input.txt;
  • dbfsにアップロード
    Databricks CLIを使って。
    databricks fs cp input.txt dbfs:/input.txt
    databricks fs cp publish.zip dbfs:/spark-dotnet/publish.zip
    databricks fs cp microsoft-spark-2-4_2.11-1.0.0.jar dbfs:/spark-dotnet/microsoft-spark-2-4_2.11-1.0.0.jar
    ※ input.txtがZIPに同梱されていても、カレント・ディレクトリが異なるのでアップロードが必要。
  • Microsoft.Spark.Worker
  • install-worker.sh
  • db-init.sh
  • dbfsにアップロード
    Databricks CLIを使って。
    databricks fs cp Microsoft.Spark.Worker.netcoreapp3.1.linux-x64-1.0.0.tar.gz dbfs:/spark-dotnet/Microsoft.Spark.Worker.netcoreapp3.1.linux-x64-1.0.0.tar.gz
    databricks fs cp install-worker.sh dbfs:/spark-dotnet/install-worker.sh
    databricks fs cp db-init.sh dbfs:/spark-dotnet/db-init.sh

ジョブを作成して実行

  • spark-submitの構成
    初めに、[Task]の[Type]を[Notebook]から[Spark Submit]に変更する。
  • クラスタ構成の設定
    次に、Clusterの[Edit]を選択し、Configure New Clusterを表示させる。
  • Databricks Runtime Versionを選択
    • Spark 2.4.1が無かったので、最も近い、
    • Runtime: 5.5 LTS (Scala 2.11, Spark 2.4.3)に変更。
  • Initスクリプトを db-init.sh に設定
    • Advanced optionsを展開する。
    • [Init Scripts]タブを選択する。
    • [Destination]ドロップダウンリストでDBFSを選択する。
    • [Init Script Path]に以下を入力する。
      dbfs:/spark-dotnet/db-init.sh
    • [Confirm]を押下する。

※ コレにより、db-init.sh 内で install-worker.sh が構成される($1-3をパラメタライズ)。

  • パラメタの設定
    • パラメタに次の文字列を貼り付ける。
      ["--class","org.apache.spark.deploy.dotnet.DotnetRunner","/dbfs/spark-dotnet/microsoft-spark-2-4_2.11-1.0.0.jar","/dbfs/spark-dotnet/publish.zip","mySparkApp"]
  • 自分は、
  • mySparkApp?MySparkApp? としていたので、その様に変更した。
  • ...と言う事で、実際、パラメタには、次の文字列を貼り付けた。
    ["--class","org.apache.spark.deploy.dotnet.DotnetRunner","/dbfs/spark-dotnet/microsoft-spark-2-4_2.11-1.0.0.jar","/dbfs/spark-dotnet/publish.zip","MySparkApp", "input.txt"]
  • [Create]を押下する。
  • アプリの実行
  • ジョブのクラスタが作成されるまで数分かかる。
    (リソース・グループを確認するとクラスタのリソース増が確認できる)
  • Job名の横にある [Run Now] ボタンをクリックする。
  • Jobで構成した Spark クラスタでJobが実行される。
  • 実行結果は、Completed Runs表中のSpark列に表示されるLogsから確認できる。
    実行の結果
  • リソースのクリーンアップ
    • リソース・グループを確認するとクラスタのリソース減が確認できる
    • 必要に応じて、(ワークスペースとクラスタの)リソース・グループを削除する。

参考

  • エラー
  • Cluster:
    Driver: Standard_DS3_v2,
    Workers: Standard_DS3_v2,
    8 workers, 5.5 LTS (includes Apache Spark 2.4.3, Scala 2.11)
  • Message:
    Unexpected failure while waiting for the cluster (...) to be ready.
    Cause Unexpected state for cluster (...):
    AZURE_QUOTA_EXCEEDED_EXCEPTION(CLIENT_ERROR):~
    azure_error_code : QuotaExceeded,~
    azure_error_message :
      Operation could not be completed as it results in exceeding approved Total Regional Cores quota.
      Additional details -
      Deployment Model: Resource Manager,
      Location: japaneast,
      Current Limit: 10,
      Current Usage: 4,
      Additional Required: 28,
      (Minimum) New Limit Required: 32.
      Submit a request for Quota increase at
      https://aka.ms/ProdportalCRP/?#create/Microsoft.Support/Parameters/%7B%22subId%22:%2232e43be6-1930-4982-a62c-a259327e5c77%22,%22pesId%22:%2206bfd9d3-516b-d5c6-5802-169c800dec89%22,%22supportTopicId%22:%22e12e3d1d-7fa0-af33-c6d0-3c50df9658a3%22%7D
      by specifying parameters listed in the 'Details' section for deployment to succeed.
      Pleas ... ***WARNING: message truncated. Skipped 964 bytes of output**
  • クォータ制限を解除する。
    以下のクォータを32に設定する。
    ・Total Regional vCPUsのクォータ
    ・Standard_DS3_v2 → DSv2 シリーズ(Standard DSv2 Promo Family vCPUs)のクォータ

  • input.txtの扱いについて、

上記と同様に行ってみる例

ワークスペースをクリーンナップするか新規作成する必要があるので、
1つのバッチにまとめて、コマンドライン引数で切り替えるのが良いかも。

バッチ処理

  • アプリ
    • publish.zipの発行とアップロード
      cd ...\DotNet4ApacheSpark\mySparkBatchApp\mySparkBatchApp
      dotnet publish -c Release -f netcoreapp3.1 -r ubuntu.16.04-x64
      cd ...\mySparkBatchApp\bin\Release\netcoreapp3.1\ubuntu.16.04-x64
      powershell compress-archive publish ..\publish.zip
      cd ..
      databricks fs rm dbfs:/spark-dotnet/publish.zip
      databricks fs cp publish.zip dbfs:/spark-dotnet/publish.zip
  • projects_smaller.csvのアップロード
    databricks fs cp projects_smaller.csv dbfs:/projects_smaller.csv
  • spark-submitのパラメタ設定
    ["--class","org.apache.spark.deploy.dotnet.DotnetRunner","/dbfs/spark-dotnet/microsoft-spark-2-4_2.11-1.0.0.jar","/dbfs/spark-dotnet/publish.zip","mySparkBatchApp", "projects_smaller.csv"]
  • エラー
    何故か、最後のDataFrame?.Showでエラーになって
    4つ目の結果セットが表示されなかった(原因不明)。

構造化ストリーミング

未実施

ML.NETでの感情分析

未実施

Java

可能らしいがサンプルが無いのは、Scalaでやるから?

...

参考

databricks.com

microsoft.com

Scala

Python (PySpark?)

Qiita

Scala

Python (PySpark?)


Tags: :クラウド, :ビッグデータ, :Azure


添付ファイル: file無題.png 29件 [詳細] file無題1.png 22件 [詳細] file無題2.png 23件 [詳細]

トップ   編集 凍結 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2021-10-21 (木) 12:45:14 (3d)