Databricks でエンド ツー エンドのデータ パイプラインを構築する

この記事では、エンドツーエンドのデータ処理パイプラインを作成してデプロイする方法について説明します。これには、生データの取り込み、データの変換、処理されたデータに対する分析の実行などの方法が含まれます。

注意

この記事では、Databricks ノートブックと Azure Databricks ジョブを使用して完全なデータ パイプラインを作成し、ワークフローのオーケストレーションを行う方法を示しますが、Databricks では Delta Live Tables を使用することを推奨しています。これは、信頼性が高く、保守がしやすい、テスト可能なデータ処理パイプラインを構築するための宣言型インターフェイスです。

データ パイプラインとは

データ パイプラインでは、ソース システムからデータを移動し、要件に基づいてそのデータを変換し、ターゲット システムにデータを格納するために必要な手順を実装します。 データ パイプラインには、生データをユーザーが使用できる準備済みデータに変換するために必要なすべてのプロセスが含まれます。 たとえば、データ パイプラインは、データ アナリストやデータ科学者が分析とレポートを通じてデータから価値を抽出できるようにデータを準備する場合があります。

抽出、変換、読み込み (ETL) ワークフローは、データ パイプラインの一般的な例です。 ETL 処理では、データはソース システムから取り込まれて、ステージング領域に書き込まれ、要件に基づいて変換 (データ品質の確保、レコードの重複の除去など) されてから、データ ウェアハウスやデータ レイクなどのターゲット システムに書き込まれます。

データ パイプラインの手順

Azure Databricks におけるデータ パイプライン構築の開始を支援するために、この記事に含まれている例では、データ処理ワークフローの作成について説明します。

  • Azure Databricks の機能を使用して、生のデータセットを調べます。
  • Databricks ノートブックを作成して生ソース データを取り込み、生データをターゲット テーブルに書き込みます。
  • Databricks ノートブックを作成して生ソース データを変換し、変換されたデータをターゲット テーブルに書き込みます。
  • Databricks ノートブックを作成して、変換されたデータに対してクエリを実行します。
  • Azure Databricks ジョブを使用してデータ パイプラインを自動化します。

要件

例: Million Song データセット

この例で使用するデータセットは、現代音楽トラックの特徴とメタデータのコレクションである Million Song Dataset のサブセットです。 このデータセットは、Azure Databricks ワークスペースに含まれているサンプル データセット内にあります。

手順 1: クラスターを作成する

この例のデータ処理と分析を実行するには、コマンドの実行に必要なコンピューティング リソースを提供するクラスターを作成します。

Note

この例では、DBFS に保存されているサンプル データセットを使い、テーブルを Unity Catalog に保持することを推奨しているため、"シングル ユーザー アクセス" モードで構成されたクラスターを作成します。 シングル ユーザー アクセス モードでは、DBFS へのフル アクセスが提供されると同時に、Unity Catalog へのアクセスも可能になります。 「DBFS と Unity Catalog のベスト プラクティス」をご覧ください。

  1. サイドバーにある [コンピューティング] をクリックします。
  2. [コンピューティング] ページの [クラスターの作成] をクリックします。
  3. [新しいクラスター] ページで、クラスターの一意の名前を入力します。
  4. [アクセス モード] で、[単一ユーザー] を選択します。
  5. [シングル ユーザーまたはサービス プリンシパル アクセス] で、自分のユーザー名を選びます。
  6. 残りの値を既定の状態のままにして、[クラスターの作成] をクリックします。

Databricks クラスターの詳細については、「コンピューティング」を参照してください。

手順 2: ソース データを調べる

Azure Databricks インターフェイスを使用して生ソース データを調べる方法については、「データ パイプラインのソース データを調べる」をご覧ください。 データの取り込みと準備に直接進む場合は、「手順 3: 生データを取り込む」に進みます。

手順 3: 生データを取り込む

この手順では、生データをテーブルに読み込み、さらに処理できるようにします。 テーブルなどの Databricks プラットフォームでデータ資産を管理するために、Databricks では Unity カタログをお勧めします。 ただし、テーブルを Unity Catalog に公開するために必要なカタログとスキーマを作成するためのアクセス許可がない場合でも、テーブルを Hive メタストアに公開することで、次の手順を実行できます。

Databricks では、データの取り込みに自動ローダーを使用することが推奨されます。 自動ローダーは、クラウド オブジェクト ストレージに到着した新しいファイルを自動的に検出して処理します。

読み込まれたデータのスキーマを自動的に検出するように自動ローダーを構成できます。これにより、データ スキーマを明示的に宣言せずにテーブルを初期化し、新しい列が導入されたときにテーブル スキーマを進化させることができます。 これにより、スキーマの変更を常時手動で追跡して適用する必要がなくなります。 Databricks では、自動ローダーを使用する場合、スキーマ推論をお勧めします。 ただし、データの調査の手順で確認したように、曲データにヘッダー情報は含まれません。 ヘッダーはデータと共に格納されないため、次の例に示すように、スキーマを明示的に定義する必要があります。

  1. サイドバーで、New Icon[新規] をクリックし、メニューから [ノートブック] を選択します。 [ノートブックの作成] ダイアログボックスが表示されます。

  2. ノートブックの名前を入力します (例: Ingest songs data)。 既定での動作は次のとおりです。

    • 選択された言語は Python です。
    • ノートブックは、最後に使用したクラスターにアタッチされます。 この場合、「手順 1: クラスターを作成する」で作成したクラスターです。
  3. ノートブックの最初のセルに、次のように入力します。

    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define variables used in the code below
    file_path = "/databricks-datasets/songs/data-001/"
    table_name = "<table-name>"
    checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"
    
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    (spark.readStream
      .format("cloudFiles")
      .schema(schema)
      .option("cloudFiles.format", "csv")
      .option("sep","\t")
      .load(file_path)
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .toTable(table_name)
    )
    

    Unity Catalog を使用している場合、<table-name> を、取り込まれるレコードを含めるカタログ、スキーマ、テーブル名に置き換えます (data_pipelines.songs_data.raw_song_data など)。 そうでない場合は、<table-name> を、取り込まれるレコードを含めるテーブルの名前に置き換えます (raw_song_data など)。

    <checkpoint-path> を、チェックポイント ファイルを保持する DBFS 内のディレクトリへのパスに置き換えます (/tmp/pipeline_get_started/_checkpoint/song_data など)。

  4. Run Menu をクリックし、[セルの実行] を選びます。 この例では、README の情報を使用してデータ スキーマを定義し、file_path に含まれるすべてのファイルから曲データを取り込み、table_name で指定したテーブルにデータを書き込みます。

手順 4: 生データを準備する

生データを分析用に準備するために、次の手順では、不要な列をフィルター処理で除外し、新しいレコードの作成のタイムスタンプを含む新しいフィールドを追加して、生の曲データを変換します。

  1. サイドバーで、New Icon[新規] をクリックし、メニューから [ノートブック] を選択します。 [ノートブックの作成] ダイアログボックスが表示されます。

  2. ノートブックの名前を入力します。 たとえば、「 Prepare songs data 」のように入力します。 既定の言語を SQL に変更します。

  3. ノートブックの最初のセルに、次のように入力します。

    CREATE OR REPLACE TABLE
      <table-name> (
        artist_id STRING,
        artist_name STRING,
        duration DOUBLE,
        release STRING,
        tempo DOUBLE,
        time_signature DOUBLE,
        title STRING,
        year DOUBLE,
        processed_time TIMESTAMP
      );
    
    INSERT INTO
      <table-name>
    SELECT
      artist_id,
      artist_name,
      duration,
      release,
      tempo,
      time_signature,
      title,
      year,
      current_timestamp()
    FROM
      <raw-songs-table-name>
    

    Unity Catalog を使用している場合、<table-name> を、フィルター処理および変換されたレコードを含めるカタログ、スキーマ、テーブル名に置き換えます (data_pipelines.songs_data.prepared_song_data など)。 そうでない場合は、<table-name> を、フィルター処理および変換されたレコードを含めるテーブルの名前に置き換えます (prepared_song_data など)。

    <raw-songs-table-name> を、前の手順で取り込まれた生の曲レコードを含むテーブルの名前に置き換えます。

  4. Run Menu をクリックし、[セルの実行] を選びます。

手順 5: 変換されたデータに対してクエリを実行する

この手順では、曲データを分析するクエリを追加して、処理パイプラインを拡張します。 これらのクエリでは、前の手順で作成した準備済みレコードを使用します。

  1. サイドバーで、New Icon[新規] をクリックし、メニューから [ノートブック] を選択します。 [ノートブックの作成] ダイアログボックスが表示されます。

  2. ノートブックの名前を入力します。 たとえば、「 Analyze songs data 」のように入力します。 既定の言語を SQL に変更します。

  3. ノートブックの最初のセルに、次のように入力します。

    -- Which artists released the most songs each year?
    SELECT
      artist_name,
      count(artist_name)
    AS
      num_songs,
      year
    FROM
      <prepared-songs-table-name>
    WHERE
      year > 0
    GROUP BY
      artist_name,
      year
    ORDER BY
      num_songs DESC,
      year DESC
    

    <prepared-songs-table-name> を、準備されたデータを含むテーブルの名前に置き換えます。 たとえば、「 data_pipelines.songs_data.prepared_song_data 」のように入力します。

  4. セルの操作メニューで Down Caret をクリックし、[下にセルを追加] を選び、新しいセルに次のように入力します。

     -- Find songs for your DJ list
     SELECT
       artist_name,
       title,
       tempo
     FROM
       <prepared-songs-table-name>
     WHERE
       time_signature = 4
       AND
       tempo between 100 and 140;
    

    <prepared-songs-table-name> を、前の手順で作成された、準備されたテーブルの名前に置き換えます。 たとえば、「 data_pipelines.songs_data.prepared_song_data 」のように入力します。

  5. クエリを実行して出力を表示するには、[すべて実行する] をクリックします。

手順 6: パイプラインを実行する Azure Databricks ジョブを作成する

Azure Databricks ジョブを使用して、データ インジェスト、処理、分析の各手順の実行を自動化するワークフローを作成できます。

  1. Data Science & Engineering ワークスペースで、次のいずれかを実行します。
    • サイドバーの Jobs Icon[ワークフロー] をクリックし、Create Job Button をクリックします。
    • サイドバーで New Icon[新規] をクリックし、[ジョブ] を選択します。
  2. [タスク] タブにあるタスク ダイアログ ボックスで、[ジョブの名前を追加...] を自分のジョブ名に置き換えます。 たとえば、"曲ワークフロー" などです。
  3. [タスク名] に、最初のタスクの名前 (例: Ingest_songs_data) を入力します。
  4. [種類] で、タスクの種類 [ノートブック] を選びます。
  5. [ソース] で、[ワークスペース] を選びます。
  6. ファイル ブラウザーを使用してデータ インジェスト ノートブックを探して、ノートブック名をクリックし、[確認] をクリックします。
  7. [クラスター] で、「Shared_job_cluster」または Create a cluster 手順で作成したクラスターを選びます。
  8. Create をクリックしてください。
  9. 作成したタスクの下の Add Task Button をクリックして、[ノートブック] を選びます。
  10. [タスク名] にタスクの名前 (例: Prepare_songs_data) を入力します。
  11. [種類] で、タスクの種類 [ノートブック] を選びます。
  12. [ソース] で、[ワークスペース] を選びます。
  13. ファイル ブラウザーを使用してデータ準備ノートブックを探して、ノートブック名をクリックし、[確認] をクリックします。
  14. [クラスター] で、「Shared_job_cluster」または Create a cluster 手順で作成したクラスターを選びます。
  15. Create をクリックしてください。
  16. 作成したタスクの下の Add Task Button をクリックして、[ノートブック] を選びます。
  17. [タスク名] にタスクの名前 (例: Analyze_songs_data) を入力します。
  18. [種類] で、タスクの種類 [ノートブック] を選びます。
  19. [ソース] で、[ワークスペース] を選びます。
  20. ファイル ブラウザーを使用してデータ分析ノートブックを探して、ノートブック名をクリックし、[確認] をクリックします。
  21. [クラスター] で、「Shared_job_cluster」または Create a cluster 手順で作成したクラスターを選びます。
  22. Create をクリックしてください。
  23. ワークフローを実行するには、Run Now Button をクリックします。 実行の詳細を表示するには、[ジョブの実行] ビューの実行の [開始時刻] 列にあるリンクをクリックします。 各タスクをクリックすると、そのタスクの実行の詳細が表示されます。
  24. ワークフローが完了しているときに結果を表示するには、最後のデータ分析タスクをクリックします。 [出力] ページが表示され、クエリの結果が表示されます。

手順 7: データ パイプライン ジョブのスケジュールを設定する

注意

この概要の例では、Azure Databricks ジョブを使用してスケジュールされたワークフローのオーケストレーションを示すために、取り込み、準備、分析の手順を別々のノートブックに分け、その後、各ノートブックを使用してジョブにタスクを作成します。 すべての処理が 1 つのノートブックに含まれる場合、Azure Databricks ノートブック UI からノートブックを直接かつ簡単にスケジュールできます。 「スケジュールされたノートブック ジョブの作成と管理」をご覧ください。

一般的な要件は、スケジュールに基づいてデータ パイプラインを実行することです。 パイプラインを実行するジョブのスケジュールを定義するには、次のようにします。

  1. サイドバーの Jobs Icon[ワークフロー] の順にクリックします。
  2. [名前] 列で、ジョブ名をクリックします。 サイド パネルにジョブの詳細が表示されます。
  3. [ジョブの詳細] パネルで [トリガーの追加] をクリックし、[トリガーの種類][スケジュール済み] を選択します。
  4. 期間、開始時刻、タイム ゾーンを指定します。 必要に応じて、[Cron 構文の表示] チェックボックスをオンにして、[Quartz Cron 構文] でスケジュールを表示および編集します。
  5. [保存] をクリックします。

詳細情報