Azure Databricks で最初の ETL ワークロードを実行する

Azure Databricks の運用環境向けのツールを使用して、データ オーケストレーション用の最初の抽出、変換、読み込み (ETL) パイプラインを開発してデプロイする方法について説明します。

この記事を読み終えるころには、次のような作業が快適に感じることでしょう。

  1. Databricks の汎用コンピューティング クラスターを起動する
  2. Databricks ノートブックを作成する
  3. 自動ローダーを使用して Delta Lake への増分データ インジェストを構成する
  4. ノートブック セルを実行してデータを処理、照会、プレビューする
  5. ノートブックを Databricks ジョブとしてスケジュールする

このチュートリアルでは、対話型ノートブックを使用して Python または Scala から一般的な ETL タスクを実行します。

Delta Live Tables を使用して ETL パイプラインを構築することもできます。 Databricks は、運用 ETL パイプラインの構築、デプロイ、保守の複雑さを軽減するために Delta Live Tables を作成しました。 「チュートリアル: 最初の Delta Live Tables パイプラインを実行する」を参照してください。

Databricks Terraform プロバイダーを使用して、この記事のリソースを作成することもできます。 「Terraform を使用してクラスター、ノートブック、ジョブを作成する」を参照してください。

必要条件

Note

以降の手順の大半は、クラスターの制御権限がなくても、クラスターへのアクセス権があれば実行できます。

手順 1: クラスターを作成する

探索的データ分析やデータ エンジニアリングを行うには、コマンドの実行に必要なコンピューティング リソースを提供するクラスターを作成します。

  1. サイドバーで、compute icon[コンピューティング]をクリックします。
  2. [コンピューティング] ページの [クラスターの作成] をクリックします。 [新しいクラスター] ページが表示されます。
  3. クラスターの一意の名前を指定し、残りの値を既定の状態のままにして、[クラスターの作成] をクリックします。

Databricks クラスターの詳細については、「コンピューティング」を参照してください。

手順 2: Databricks ノートブックを作成する

Azure Databricks で対話的にコードを記述して実行するには、ノートブックを作成します。

  1. サイドバーで、新規アイコン[新規] をクリックし、[Notebook] をクリックします。
  2. [ノートブックの作成] ページで、次の手順を実行します。
    • ノートブックの一意の名前を指定します。
    • 既定の言語が Python または Scala に設定されていることを確認します。
    • 手順 1. で作成したクラスターを [クラスター] ボックスの一覧から選択します。
    • [作成] をクリックします。

ノートブックが開き、上部に空のセルが表示されます。

ノートブックの作成と管理について詳しくは、「Notebooks を管理する」を参照してください。

手順 3: Delta Lake にデータを取り込むように自動ローダーを構成する

Databricks では、自動ローダーを使用した増分データ インジェストが推奨されています。 自動ローダーは、クラウド オブジェクト ストレージに到着した新しいファイルを自動的に検出して処理します。

Databricks はデータの格納に Delta Lake を使用することを推奨しています。 Delta Lake はオープンソースのストレージ レイヤーです。ACID トランザクションを備え、データ レイクハウスを実現します。 Databricks に作成されるテーブルの既定の形式は Delta Lake です。

Delta Lake テーブルにデータを取り込むように自動ローダーを構成するには、次のコードをコピーして、ノートブックの空のセルに貼り付けます。

Python

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

Scala

// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._

// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"

// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)

// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.AvailableNow)
  .toTable(table_name)

注意

このコードで定義されている変数を使用すれば、既存のワークスペース資産や他のユーザーと競合することなく安全にコードを実行できます。 ネットワークまたはストレージのアクセス許可が制限されている場合は、このコードを実行するとエラーが発生します。ワークスペース管理者に連絡して、それらの制限のトラブルシューティングを行ってください。

自動ローダーの詳細については、「自動ローダー」を参照してください。

手順 4: データを処理して対話的に操作する

ノートブックでは、ロジックがセル単位で実行されます。 セル内のロジックを実行するには:

  1. 前の手順で完成したセルを実行するために、そのセルを選択し、Shift + Enter キーを押します。

  2. 作成されたテーブルに対してクエリを実行するために、以下のコードをコピーして空のセルに貼り付け、Shift + Enter キーを押してセルを実行します。

    Python

    df = spark.read.table(table_name)
    

    Scala

    val df = spark.read.table(table_name)
    
  3. DataFrame 内のデータをプレビューするために、以下のコードをコピーして空のセルに貼り付け、Shift + Enter キーを押してセルを実行します。

    Python

    display(df)
    

    Scala

    display(df)
    

データを対話的に視覚化する方法について詳しくは、「Databricks ノートブックでの視覚化」を参照してください。

手順 5: ジョブをスケジュールする

Databricks ノートブックを運用スクリプトとして実行するには、それらをタスクとして Databricks ジョブに追加します。 この手順では、手動でトリガーできる新しいジョブを作成します。

ノートブックをタスクとしてスケジュールするには:

  1. ヘッダー バーの右側にある [スケジュール] をクリックします。
  2. [ジョブ名] に一意の名前を入力します。
  3. [手動] をクリックします。
  4. [クラスター] ボックスの一覧から、手順 1. で作成したクラスターを選択します。
  5. [作成] をクリックします。
  6. 表示されたウィンドウで、[今すぐ実行] をクリックします。
  7. ジョブの実行結果を確認するには、前回実行タイムスタンプの横にある 外部リンク アイコンをクリックします。

ジョブの詳細については、Azure Databricks のジョブとはに関するページを参照してください。

その他の統合

Azure Databricks を使用したデータ エンジニアリングのための統合とツールについては、以下の記事を参照してください。