&
このチュートリアルでは、Azure Data Lake Storage Gen2 対応の Azure ストレージ アカウント内の格納データに Azure Databricks クラスターを接続する方法を説明します。 この接続を使用することで、必要なデータに関するクエリや分析をクラスターからネイティブに実行することができます。
このチュートリアルでは、次のことについて説明します。
- 非構造化データをストレージ アカウントに取り込む
- Blob Storage 内のデータに対して分析を実行する
Azure サブスクリプションをお持ちでない場合は、開始する前に 無料アカウント を作成してください。
前提条件
階層型名前空間 (Azure Data Lake Storage Gen2) を持つストレージ アカウントを作成します
「Azure Data Lake Storage Gen2 で使用するストレージ アカウントを作成する」をご覧ください。
ユーザー アカウントにストレージ BLOB データ共同作成者ロールが割り当てられていることを確認します。
AzCopy v10 をインストールします。 AzCopy v10 を使用したデータ転送に関するページを参照してください。
サービス プリンシパルを作成し、クライアント シークレットを作成し、サービス プリンシパルにストレージ アカウントへのアクセス権を付与します。
「チュートリアル: Azure Data Lake Storage Gen2 に接続する」 (手順 1 から 3) を参照してください。 これらの手順を完了したら、テナント ID、アプリ ID、クライアント シークレットの値をテキスト ファイルに貼り付けてください。 これらはすぐに必要になります。
フライト データのダウンロード
このチュートリアルでは、運輸統計局からのフライト データを使用して ETL 操作を実行する方法を示します。 チュートリアルを完了するには、このデータをダウンロードする必要があります。
On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip ファイルをダウンロードします。 このファイルには、フライト データが含まれています。
ZIP ファイルの内容を解凍し、ファイル名とファイル パスをメモします。 この情報は後の手順で必要になります。
データの取り込み
ソース データをストレージ アカウントにコピーする
AzCopy を使用して .csv ファイルから Data Lake Storage Gen2 アカウントにデータをコピーします。
コマンド プロンプト ウィンドウを開き、次のコマンドを入力してストレージ アカウントにログインします。
azcopy login
コマンド プロンプト ウィンドウに表示される指示に従って、ユーザー アカウントを認証します。
.csv アカウントからデータをコピーするには、次のコマンドを入力します。
azcopy cp "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/folder1/On_Time.csv
プレースホルダー
<csv-folder-path>
の値は、 .csv ファイルへのパスに置き換えます。<storage-account-name>
プレースホルダーの値は、実際のストレージ アカウントの名前に置き換えます。<container-name>
プレースホルダーは、実際のストレージ アカウントにあるコンテナーの名前に置き換えます。
Azure Databricks ワークスペース、クラスター、ノートブックを作成する
Azure Databricks ワークスペースを作成する。 「Azure Databricks ワークスペースを作成する」をご覧ください。
クラスターを作成する。 クラスターの作成に関する記事を参照してください。
Notebook を作成します。 「ノートブックを作成する」を参照してください。 ノートブックの既定の言語として Python を選択します。
コンテナーを作成してマウントする
[クラスター] ドロップダウン リストで、先ほど作成したクラスターが選択されていることを確認します。
Create をクリックしてください。 ノートブックが開き、上部に空のセルが示されます。
次のコード ブロックをコピーして最初のセルに貼り付けます。ただし、このコードはまだ実行しないでください。
configs = {"fs.azure.account.auth.type": "OAuth", "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", "fs.azure.account.oauth2.client.id": "<appId>", "fs.azure.account.oauth2.client.secret": "<clientSecret>", "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenant>/oauth2/token", "fs.azure.createRemoteFileSystemDuringInitialization": "true"} dbutils.fs.mount( source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/folder1", mount_point = "/mnt/flightdata", extra_configs = configs)
このコード ブロックでは、
appId
、clientSecret
、tenant
、およびstorage-account-name
のプレースホルダー値を、このチュートリアルの前提条件の実行中に収集した値で置き換えます。 プレースホルダーcontainer-name
の値は、コンテナーの名前に置き換えます。Shift + Enter キーを押して、このブロック内のコードを実行します。
このノートブックは開いたままにしておいてください。後でコマンドを追加します。
Databricks Notebook を使用して CSV を Parquet に変換する
前もって作成しておいたノートブックに新しいセルを追加し、そこに次のコードを貼り付けます。
# Use the previously established DBFS mount point to read the data.
# create a data frame to read data.
flightDF = spark.read.format('csv').options(
header='true', inferschema='true').load("/mnt/flightdata/*.csv")
# read the airline csv file and write the output to parquet format for easy query.
flightDF.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")
データの探索
AzCopy 経由でアップロードされた CSV ファイルの一覧を取得するために、次のコードを新しいセルに貼り付けます。
import os.path
import IPython
from pyspark.sql import SQLContext
display(dbutils.fs.ls("/mnt/flightdata"))
新しいファイルを作成して parquet/flights フォルダー内のファイルの一覧を作成するには、次のスクリプトを実行します。
dbutils.fs.put("/mnt/flightdata/1.txt", "Hello, World!", True)
dbutils.fs.ls("/mnt/flightdata/parquet/flights")
上記のコード サンプルでは、Data Lake Storage Gen2 対応のストレージ アカウントに格納されたデータを使って HDFS の階層的な性質を調査しました。
データにクエリを実行する
これで、ストレージ アカウントにアップロードしたデータの照会を開始できます。 次のコード ブロックをそれぞれ [Cmd 1] に入力し、Cmd を押しながら Enter キーを押して Python スクリプトを実行します。
データ ソースのデータフレームを作成するには、次のスクリプトを実行します。
- プレースホルダー
<csv-folder-path>
の値は、 .csv ファイルへのパスに置き換えます。
# Copy this into a Cmd cell in your notebook.
acDF = spark.read.format('csv').options(
header='true', inferschema='true').load("/mnt/flightdata/On_Time.csv")
acDF.write.parquet('/mnt/flightdata/parquet/airlinecodes')
# read the existing parquet file for the flights database that was created earlier
flightDF = spark.read.format('parquet').options(
header='true', inferschema='true').load("/mnt/flightdata/parquet/flights")
# print the schema of the dataframes
acDF.printSchema()
flightDF.printSchema()
# print the flight database size
print("Number of flights in the database: ", flightDF.count())
# show the first 20 rows (20 is the default)
# to show the first n rows, run: df.show(n)
acDF.show(100, False)
flightDF.show(20, False)
# Display to run visualizations
# preferably run this in a separate cmd cell
display(flightDF)
いくつかの基本的な分析クエリをデータに対して実行するために、次のスクリプトを入力します。
# Run each of these queries, preferably in a separate cmd cell for separate analysis
# create a temporary sql view for querying flight information
FlightTable = spark.read.parquet('/mnt/flightdata/parquet/flights')
FlightTable.createOrReplaceTempView('FlightTable')
# create a temporary sql view for querying airline code information
AirlineCodes = spark.read.parquet('/mnt/flightdata/parquet/airlinecodes')
AirlineCodes.createOrReplaceTempView('AirlineCodes')
# using spark sql, query the parquet file to return total flights in January and February 2016
out1 = spark.sql("SELECT * FROM FlightTable WHERE Month=1 and Year= 2016")
NumJan2016Flights = out1.count()
out2 = spark.sql("SELECT * FROM FlightTable WHERE Month=2 and Year= 2016")
NumFeb2016Flights = out2.count()
print("Jan 2016: ", NumJan2016Flights, " Feb 2016: ", NumFeb2016Flights)
Total = NumJan2016Flights+NumFeb2016Flights
print("Total flights combined: ", Total)
# List out all the airports in Texas
out = spark.sql(
"SELECT distinct(OriginCityName) FROM FlightTable where OriginStateName = 'Texas'")
print('Airports in Texas: ', out.show(100))
# find all airlines that fly from Texas
out1 = spark.sql(
"SELECT distinct(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', out1.show(100, False))
リソースをクリーンアップする
リソース グループおよび関連するすべてのリソースは、不要になったら削除します。 これを行うには、ストレージ アカウントのリソース グループを選択し、 [削除] を選択してください。