チュートリアル: Databricks プラットフォームで Apache Spark を使用して ETL パイプラインを構築する

このチュートリアルでは、Apache Spark を使用したデータ オーケストレーション用の最初の ETL (抽出、変換、読み込み) パイプラインを開発してデプロイする方法について説明します。 このチュートリアルでは Databricks の汎用コンピューティングを使用しますが、ワークスペースで有効になっている場合は、サーバーレス コンピューティングを使用することもできます。

また、Lakeflow Spark 宣言パイプラインを使用して ETL パイプラインを構築することもできます。 Databricks Lakeflow Spark 宣言パイプラインを使用すると、運用 ETL パイプラインの構築、デプロイ、保守の複雑さが軽減されます。 「 チュートリアル: Lakeflow Spark 宣言パイプラインを使用して ETL パイプラインを構築する」を参照してください。

この記事の終わりまでに、次の方法がわかります。

  1. Databricks の汎用コンピューティング リソースを起動します
  2. Databricks ノートブックを作成します
  3. 自動ローダーを使用して Delta Lake への増分データ インジェストを構成します
  4. データの処理と操作
  5. ノートブックを Databricks ジョブとしてスケジュールします

このチュートリアルでは、対話型ノートブックを使用して、Pythonまたは Scala で一般的な ETL タスクを実行します。

Databricks Terraform プロバイダーを使用して、この記事のリソースを作成することもできます。 「Terraform を使用してクラスター、ノートブック、ジョブを作成する」を参照してください。

必要条件

コンピューティング制御特権がない場合でも、 コンピューティング リソースにアクセスできる限り、以下のほとんどの手順を完了できます。

手順 1: コンピューティング リソースを作成する

探索的データ分析とデータ エンジニアリングを行うには、コマンドを実行するコンピューティング リソースを作成します。

  1. サイド バーで compute icon[コンピューティング] をクリックします。
  2. [コンピューティング] ページで、[ コンピューティングの作成] をクリックします。
  3. コンピューティング リソースの一意の名前を指定し、残りの値を既定の状態のままにして、[ コンピューティングの作成] をクリックします。

Databricks コンピューティングの詳細については、「 コンピューティング」を参照してください。

手順 2: Databricks ノートブックを作成する

ワークスペースにノートブックを作成するには、サイド バーの 新規アイコン[新規] をクリックしてから、[ノートブック] をクリックします。 ワークスペースに空のノートブックが開きます。

ノートブックの作成と管理について詳しくは、「Notebooks を管理する」を参照してください。

手順 3: Delta Lake にデータを取り込むように自動ローダーを構成する

Databricks では、自動ローダーを使用した増分データ インジェストが推奨されています。 自動ローダーは、クラウド オブジェクト ストレージに到着した新しいファイルを自動的に検出して処理します。

Databricks はデータの格納に Delta Lake を使用することを推奨しています。 Delta Lake は、ACID トランザクションを提供し、Data Lakehouse を有効にするopen sourceストレージ レイヤーです。 Databricks に作成されるテーブルの既定の形式は Delta Lake です。

Delta Lake テーブルにデータを取り込むように自動ローダーを構成するには、次のコードをコピーして、ノートブックの空のセルに貼り付けます。

Python

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

スカラ (プログラミング言語)

// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._

// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"

// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)

// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.AvailableNow)
  .toTable(table_name)

このコードで定義されている変数を使用すれば、既存のワークスペース資産や他のユーザーと競合することなく安全にコードを実行できます。 ネットワークまたはストレージのアクセス許可が制限されている場合は、このコードを実行するとエラーが発生します。ワークスペース管理者に連絡して、それらの制限のトラブルシューティングを行ってください。

自動ローダーの詳細については、「自動ローダー」を参照してください。

手順 4: データを処理してデータと対話する

ノートブックでは、ロジックがセル単位で実行されます。 セル内のロジックを実行するには:

  1. 前の手順で完成したセルを実行するために、そのセルを選択し、Shift + Enter キーを押します。

  2. 作成したテーブルに対してクエリを実行するには、次のコードをコピーして空のセルに貼り付け、 Shift キーを押しながら Enter キー を押してセルを実行します。

    Python

    df = spark.read.table(table_name)
    

    スカラ (プログラミング言語)

    val df = spark.read.table(table_name)
    
  3. DataFrame 内のデータをプレビューするために、以下のコードをコピーして空のセルに貼り付け、Shift + Enter キーを押してセルを実行します。

    Python

    display(df)
    

    スカラ (プログラミング言語)

    display(df)
    

データを視覚化するための対話型オプションの詳細については、 Databricks ノートブックと SQL エディターでの視覚化に関するページを参照してください。

手順 5: ジョブをスケジュールする

Databricks ノートブックを運用スクリプトとして実行するには、それらをタスクとして Databricks ジョブに追加します。 この手順では、手動でトリガーできる新しいジョブを作成します。

ノートブックをタスクとしてスケジュールするには:

  1. ヘッダー バーの右側にある [スケジュール] をクリックします。
  2. [ジョブ名] に一意の名前を入力します。
  3. [手動] をクリックします。
  4. [ コンピューティング ] ドロップダウンで、手順 1 で作成したコンピューティング リソースを選択します。
  5. [作成] をクリックします。
  6. 表示されたウィンドウで、[今すぐ実行] をクリックします。
  7. ジョブの実行結果を見るには、「前回実行」のタイムスタンプ横にある 外部リンク アイコンをクリックします。

ジョブの詳細については、「ジョブとは」を参照してください。.

その他の統合

Azure Databricksを使用したデータ エンジニアリングの統合とツールの詳細については、以下をご覧ください。