&

このチュートリアルでは、Azure Data Lake Storage Gen2 対応の Azure ストレージ アカウント内の格納データに Azure Databricks クラスターを接続する方法を説明します。 この接続を使用することで、必要なデータに関するクエリや分析をクラスターからネイティブに実行することができます。

このチュートリアルでは、次のことについて説明します。

  • 非構造化データをストレージ アカウントに取り込む
  • Blob Storage 内のデータに対して分析を実行する

Azure サブスクリプションをお持ちでない場合は、開始する前に 無料アカウント を作成してください。

前提条件

フライト データのダウンロード

このチュートリアルでは、運輸統計局からのフライト データを使用して ETL 操作を実行する方法を示します。 チュートリアルを完了するには、このデータをダウンロードする必要があります。

  1. On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip ファイルをダウンロードします。 このファイルには、フライト データが含まれています。

  2. ZIP ファイルの内容を解凍し、ファイル名とファイル パスをメモします。 この情報は後の手順で必要になります。

データの取り込み

ソース データをストレージ アカウントにコピーする

AzCopy を使用して .csv ファイルから Data Lake Storage Gen2 アカウントにデータをコピーします。

  1. コマンド プロンプト ウィンドウを開き、次のコマンドを入力してストレージ アカウントにログインします。

    azcopy login
    

    コマンド プロンプト ウィンドウに表示される指示に従って、ユーザー アカウントを認証します。

  2. .csv アカウントからデータをコピーするには、次のコマンドを入力します。

    azcopy cp "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/folder1/On_Time.csv
    
    • プレースホルダー <csv-folder-path> の値は、 .csv ファイルへのパスに置き換えます。

    • <storage-account-name> プレースホルダーの値は、実際のストレージ アカウントの名前に置き換えます。

    • <container-name> プレースホルダーは、実際のストレージ アカウントにあるコンテナーの名前に置き換えます。

Azure Databricks ワークスペース、クラスター、ノートブックを作成する

  1. Azure Databricks ワークスペースを作成する。 「Azure Databricks ワークスペースを作成する」をご覧ください。

  2. クラスターを作成する。 クラスターの作成に関する記事を参照してください。

  3. Notebook を作成します。 「ノートブックを作成する」を参照してください。 ノートブックの既定の言語として Python を選択します。

コンテナーを作成してマウントする

  1. [クラスター] ドロップダウン リストで、先ほど作成したクラスターが選択されていることを確認します。

  2. Create をクリックしてください。 ノートブックが開き、上部に空のセルが示されます。

  3. 次のコード ブロックをコピーして最初のセルに貼り付けます。ただし、このコードはまだ実行しないでください。

    configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<appId>",
           "fs.azure.account.oauth2.client.secret": "<clientSecret>",
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenant>/oauth2/token",
           "fs.azure.createRemoteFileSystemDuringInitialization": "true"}
    
    dbutils.fs.mount(
    source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/folder1",
    mount_point = "/mnt/flightdata",
    extra_configs = configs)
    
  4. このコード ブロックでは、appIdclientSecrettenant、および storage-account-name のプレースホルダー値を、このチュートリアルの前提条件の実行中に収集した値で置き換えます。 プレースホルダー container-name の値は、コンテナーの名前に置き換えます。

  5. Shift + Enter キーを押して、このブロック内のコードを実行します。

    このノートブックは開いたままにしておいてください。後でコマンドを追加します。

Databricks Notebook を使用して CSV を Parquet に変換する

前もって作成しておいたノートブックに新しいセルを追加し、そこに次のコードを貼り付けます。

# Use the previously established DBFS mount point to read the data.
# create a data frame to read data.

flightDF = spark.read.format('csv').options(
    header='true', inferschema='true').load("/mnt/flightdata/*.csv")

# read the airline csv file and write the output to parquet format for easy query.
flightDF.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")

データの探索

AzCopy 経由でアップロードされた CSV ファイルの一覧を取得するために、次のコードを新しいセルに貼り付けます。

import os.path
import IPython
from pyspark.sql import SQLContext
display(dbutils.fs.ls("/mnt/flightdata"))

新しいファイルを作成して parquet/flights フォルダー内のファイルの一覧を作成するには、次のスクリプトを実行します。

dbutils.fs.put("/mnt/flightdata/1.txt", "Hello, World!", True)
dbutils.fs.ls("/mnt/flightdata/parquet/flights")

上記のコード サンプルでは、Data Lake Storage Gen2 対応のストレージ アカウントに格納されたデータを使って HDFS の階層的な性質を調査しました。

データにクエリを実行する

これで、ストレージ アカウントにアップロードしたデータの照会を開始できます。 次のコード ブロックをそれぞれ [Cmd 1] に入力し、Cmd を押しながら Enter キーを押して Python スクリプトを実行します。

データ ソースのデータフレームを作成するには、次のスクリプトを実行します。

  • プレースホルダー <csv-folder-path> の値は、 .csv ファイルへのパスに置き換えます。
# Copy this into a Cmd cell in your notebook.
acDF = spark.read.format('csv').options(
    header='true', inferschema='true').load("/mnt/flightdata/On_Time.csv")
acDF.write.parquet('/mnt/flightdata/parquet/airlinecodes')

# read the existing parquet file for the flights database that was created earlier
flightDF = spark.read.format('parquet').options(
    header='true', inferschema='true').load("/mnt/flightdata/parquet/flights")

# print the schema of the dataframes
acDF.printSchema()
flightDF.printSchema()

# print the flight database size
print("Number of flights in the database: ", flightDF.count())

# show the first 20 rows (20 is the default)
# to show the first n rows, run: df.show(n)
acDF.show(100, False)
flightDF.show(20, False)

# Display to run visualizations
# preferably run this in a separate cmd cell
display(flightDF)

いくつかの基本的な分析クエリをデータに対して実行するために、次のスクリプトを入力します。

# Run each of these queries, preferably in a separate cmd cell for separate analysis
# create a temporary sql view for querying flight information
FlightTable = spark.read.parquet('/mnt/flightdata/parquet/flights')
FlightTable.createOrReplaceTempView('FlightTable')

# create a temporary sql view for querying airline code information
AirlineCodes = spark.read.parquet('/mnt/flightdata/parquet/airlinecodes')
AirlineCodes.createOrReplaceTempView('AirlineCodes')

# using spark sql, query the parquet file to return total flights in January and February 2016
out1 = spark.sql("SELECT * FROM FlightTable WHERE Month=1 and Year= 2016")
NumJan2016Flights = out1.count()
out2 = spark.sql("SELECT * FROM FlightTable WHERE Month=2 and Year= 2016")
NumFeb2016Flights = out2.count()
print("Jan 2016: ", NumJan2016Flights, " Feb 2016: ", NumFeb2016Flights)
Total = NumJan2016Flights+NumFeb2016Flights
print("Total flights combined: ", Total)

# List out all the airports in Texas
out = spark.sql(
    "SELECT distinct(OriginCityName) FROM FlightTable where OriginStateName = 'Texas'")
print('Airports in Texas: ', out.show(100))

# find all airlines that fly from Texas
out1 = spark.sql(
    "SELECT distinct(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', out1.show(100, False))

リソースをクリーンアップする

リソース グループおよび関連するすべてのリソースは、不要になったら削除します。 これを行うには、ストレージ アカウントのリソース グループを選択し、 [削除] を選択してください。

次のステップ