このチュートリアルでは、Apache Spark を使用したデータ オーケストレーション用の最初の ETL (抽出、変換、読み込み) パイプラインを開発してデプロイする方法について説明します。 このチュートリアルでは Databricks の汎用コンピューティングを使用しますが、ワークスペースで有効になっている場合は、サーバーレス コンピューティングを使用することもできます。
また、Lakeflow Spark 宣言パイプラインを使用して ETL パイプラインを構築することもできます。 Databricks Lakeflow Spark 宣言パイプラインを使用すると、運用 ETL パイプラインの構築、デプロイ、保守の複雑さが軽減されます。 「 チュートリアル: Lakeflow Spark 宣言パイプラインを使用して ETL パイプラインを構築する」を参照してください。
この記事の終わりまでに、次の方法がわかります。
- Databricks の汎用コンピューティング リソースを起動します。
- Databricks ノートブックを作成します。
- 自動ローダーを使用して Delta Lake への増分データ インジェストを構成します。
- データの処理と操作。
- ノートブックを Databricks ジョブとしてスケジュールします。
このチュートリアルでは、対話型ノートブックを使用して、Pythonまたは Scala で一般的な ETL タスクを実行します。
Databricks Terraform プロバイダーを使用して、この記事のリソースを作成することもできます。 「Terraform を使用してクラスター、ノートブック、ジョブを作成する」を参照してください。
必要条件
- Azure Databricks ワークスペースにログインしています。
- コンピューティング リソースを作成するアクセス許可があります。
注
コンピューティング制御特権がない場合でも、 コンピューティング リソースにアクセスできる限り、以下のほとんどの手順を完了できます。
手順 1: コンピューティング リソースを作成する
探索的データ分析とデータ エンジニアリングを行うには、コマンドを実行するコンピューティング リソースを作成します。
- サイド バーで
[コンピューティング] をクリックします。 - [コンピューティング] ページで、[ コンピューティングの作成] をクリックします。
- コンピューティング リソースの一意の名前を指定し、残りの値を既定の状態のままにして、[ コンピューティングの作成] をクリックします。
Databricks コンピューティングの詳細については、「 コンピューティング」を参照してください。
手順 2: Databricks ノートブックを作成する
ワークスペースにノートブックを作成するには、サイド バーの
[新規] をクリックしてから、[ノートブック] をクリックします。 ワークスペースに空のノートブックが開きます。
ノートブックの作成と管理について詳しくは、「Notebooks を管理する」を参照してください。
手順 3: Delta Lake にデータを取り込むように自動ローダーを構成する
Databricks では、自動ローダーを使用した増分データ インジェストが推奨されています。 自動ローダーは、クラウド オブジェクト ストレージに到着した新しいファイルを自動的に検出して処理します。
Databricks はデータの格納に Delta Lake を使用することを推奨しています。 Delta Lake は、ACID トランザクションを提供し、Data Lakehouse を有効にするopen sourceストレージ レイヤーです。 Databricks に作成されるテーブルの既定の形式は Delta Lake です。
Delta Lake テーブルにデータを取り込むように自動ローダーを構成するには、次のコードをコピーして、ノートブックの空のセルに貼り付けます。
Python
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"
# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))
スカラ (プログラミング言語)
// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._
// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"
// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)
// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(Trigger.AvailableNow)
.toTable(table_name)
注
このコードで定義されている変数を使用すれば、既存のワークスペース資産や他のユーザーと競合することなく安全にコードを実行できます。 ネットワークまたはストレージのアクセス許可が制限されている場合は、このコードを実行するとエラーが発生します。ワークスペース管理者に連絡して、それらの制限のトラブルシューティングを行ってください。
自動ローダーの詳細については、「自動ローダー」を参照してください。
手順 4: データを処理してデータと対話する
ノートブックでは、ロジックがセル単位で実行されます。 セル内のロジックを実行するには:
前の手順で完成したセルを実行するために、そのセルを選択し、Shift + Enter キーを押します。
作成したテーブルに対してクエリを実行するには、次のコードをコピーして空のセルに貼り付け、 Shift キーを押しながら Enter キー を押してセルを実行します。
Python
df = spark.read.table(table_name)スカラ (プログラミング言語)
val df = spark.read.table(table_name)DataFrame 内のデータをプレビューするために、以下のコードをコピーして空のセルに貼り付け、Shift + Enter キーを押してセルを実行します。
Python
display(df)スカラ (プログラミング言語)
display(df)
データを視覚化するための対話型オプションの詳細については、 Databricks ノートブックと SQL エディターでの視覚化に関するページを参照してください。
手順 5: ジョブをスケジュールする
Databricks ノートブックを運用スクリプトとして実行するには、それらをタスクとして Databricks ジョブに追加します。 この手順では、手動でトリガーできる新しいジョブを作成します。
ノートブックをタスクとしてスケジュールするには:
- ヘッダー バーの右側にある [スケジュール] をクリックします。
- [ジョブ名] に一意の名前を入力します。
- [手動] をクリックします。
- [ コンピューティング ] ドロップダウンで、手順 1 で作成したコンピューティング リソースを選択します。
- [作成] をクリックします。
- 表示されたウィンドウで、[今すぐ実行] をクリックします。
- ジョブの実行結果を見るには、「前回実行」のタイムスタンプ横にある
アイコンをクリックします。
ジョブの詳細については、「ジョブとは」を参照してください。.
その他の統合
Azure Databricksを使用したデータ エンジニアリングの統合とツールの詳細については、以下をご覧ください。