次の方法で共有


チュートリアル: 最初の Delta Live Tables パイプラインを実行する

重要

サーバーレス DLT パイプラインは、パブリック プレビュー段階にあります。

このチュートリアルでは、Databricks ノートブックのコードから Delta Live Tables パイプラインを構成し、パイプラインの更新をトリガーしてパイプラインを実行する方法について説明します。 このチュートリアルには、サンプル データセットを取り込んで処理するパイプラインの例と、PythonSQL インターフェイスを使用するコードの例が含まれています。 また、このチュートリアルの手順を使って、適切に定義された Delta Live Tables 構文を持つ任意のノートブックでパイプラインを作成することもできます。

Delta Live Tables パイプラインを構成し、Azure Databricks ワークスペース UI または API、CLI、Databricks アセット バンドルなどの自動化されたツール オプションを使って、または Databricks ワークフロー内のタスクとして、更新をトリガーできます。 Delta Live Tables の機能と機能を理解するには、最初に UI を使用してパイプラインを作成して実行することをお勧めします。 さらに、UI でパイプラインを構成すると、Delta Live Tables によって、プログラムによるワークフローの実装に使用できるパイプライン用 JSON 構成が生成されます。

Delta Live Tables の機能を見るため、このチュートリアルの例では、一般公開されているデータセットをダウンロードします。 ただし、Databricks には、データ ソースに接続し、実際のユース ケースを実装するパイプラインで使われるデータを取り込む方法がいくつかあります。 「Delta Live Tables でデータを取り込む」をご覧ください。

要件

  • パイプラインを開始するには、クラスター作成のアクセス許可、または Delta Live Tables クラスターを定義するクラスター ポリシーへのアクセス権が必要です。 Delta Live Tables ランタイムでは、パイプラインを実行する前にクラスターを作成します。ユーザーが適切なアクセス許可を持たない場合、これは失敗します。

  • このチュートリアルの例を使うには、ワークスペースで Unity Catalog が有効になっている必要があります。

  • Unity Catalog での次のアクセス許可が必要です。

    • my-volume ボリュームの READ VOLUMEWRITE VOLUME、または ALL PRIVILEGES
    • default スキーマの USE SCHEMA または ALL PRIVILEGES
    • main カタログの USE CATALOG または ALL PRIVILEGES

    これらのアクセス許可を設定するには、Databricks 管理者に確認するか、「Unity Catalog の権限とセキュリティ保護可能なオブジェクト」を参照してください。

  • このチュートリアルの例では、Unity Catalog ボリュームを使ってサンプル データを格納します。 これらの例を使うには、ボリュームを作成し、そのボリュームのカタログ、スキーマ、ボリュームの名前を使って、例で使われるボリューム パスを設定します。

Note

ワークスペースで Unity Catalog が有効になっていない場合は、Unity Catalog を必要としない例を含むノートブックがこの記事に添付されています。 これらの例を使うには、パイプラインの作成時にストレージ オプションとして Hive metastore を選んでください。

Delta Live Tables のクエリを実行する場所

Delta Live Tables クエリは主に Databricks ノートブックで実装されますが、Delta Live Tables はノートブック セルで対話形式で実行するようには設計されていません。 Databricks ノートブックで Delta Live Tables 構文を含むセルを実行すると、エラー メッセージが表示されます。 クエリを実行するには、パイプラインの一部としてノートブックを構成する必要があります。

重要

  • Delta Live Tables に対するクエリを記述するとき、ノートブックのセルごとの実行順序に依存することはできません。 Delta Live Tables は、ノートブックで定義されているすべてのコードを評価して実行しますが、その実行モデルはノートブックの [すべてを実行] コマンドとは異なります。
  • Delta Live Tables の 1 つのソース コード ファイルに言語を混在させることはできません。 たとえば、1 つのノートブックには Python クエリまたは SQL クエリのみを含めることができます。 パイプラインで複数の言語を使う必要がある場合は、言語固有の複数のノートブックまたはファイルをパイプラインで使います。

ファイルに格納されている Python コードを使うこともできます。 たとえば、Python パイプラインにインポートできる Python モジュールを作成したり、SQL クエリで使う Python ユーザー定義関数 (UDF) を定義したりできます。 Python モジュールのインポートについては、「Git フォルダーまたはワークスペース ファイルから Python モジュールをインポートする」をご覧ください。 Python UDF の使用については、「ユーザー定義スカラー関数 - Python」をご覧ください。

例: ニューヨークの赤ちゃんの名前のデータを取り込んで処理する

この記事の例では、ニューヨーク州の赤ちゃんの名前 のレコードを含む一般公開されているデータセットを使います。 これらの例は、Delta Live Tables パイプラインを使って次のことを行う方法が示されています。

  • 公開されているデータセットからテーブルに生の CSV データを読み取る。
  • 未加工のデータ テーブルからレコードを読み取り、Delta Live Tables の期待値を使用して、クレンジング済みのデータを含む新しいテーブルを作成します。
  • 派生データセットを作成する Delta Live Tables クエリへの入力として、クレンジング済みのレコードを使う。

このコードは、メダリオン アーキテクチャのシンプルな例を示しています。 「メダリオン レイクハウス アーキテクチャとは」を参照してください。

この例の実装は、PythonSQL のインターフェイスについて提供されています。 手順に従ってコード例を含む新しいノートブックを作成しても、「パイプラインを作成する」までスキップし、このページで提供されているノートブックのいずれかを使ってもかまいません。

Python を使用して Delta Live Tables パイプラインを実装する

Delta Live Tables データセットを作成する Python コードでは、Spark エクスペリエンス用の PySpark または Pandas を使用するユーザーになじみのある DataFrame を返す必要があります。 DataFrame に慣れていないユーザーの場合は、Databricks では SQL インターフェイスを使うことをお勧めします。 「SQL を使用して Delta Live Tables パイプラインを実装する」をご覧ください。

すべての Delta Live Tables の Python API は、dlt モジュールに実装されます。 Python で実装される Delta Live Tables パイプライン コードでは、Python のノートブックとファイルの先頭で dlt モジュールを明示的にインポートする必要があります。 Delta Live Tables は、重要な点で多くの Python スクリプトとは異なります。データ インジェストと変換を実行して Delta Live Tables データセットを作成する関数は呼び出しません。 Delta Live Tables は代わりに、パイプラインに読み込まれるすべてのファイルの dlt モジュールのデコレーター関数を解釈し、データフロー グラフを構築します。

このチュートリアルの例を実装するには、次の Python コードをコピーして新しい Python ノートブックに貼り付けます。 例の各コード スニペットを、説明されている順序でノートブックの個別のセルに追加する必要があります。 ノートブックを作成するためのオプションを確認するには、「ノートブックを作成する」を参照してください。

Note

Python インターフェイスを使用してパイプラインを作成する場合、既定では、テーブル名は関数名によって定義されます。 たとえば、次の Python の例では、baby_names_rawbaby_names_preparedtop_baby_names_2021 という名前の 3 つのテーブルを作成します。 name パラメーターを使用すると、テーブル名をオーバーライドできます。 「Delta Live Tables の具体化されたビューまたはストリーミング テーブルを作成する」を参照してください。

Delta Live Tables モジュールをインポートする

すべての Delta Live Tables の Python API は、dlt モジュールに実装されます。 Python ノートブックとファイルの先頭で dlt モジュールを明示的にインポートします。

次の例は、このインポートと、pyspark.sql.functions に対するインポート ステートメントを示しています。

import dlt
from pyspark.sql.functions import *

データをダウンロードする

この例のデータを取得するには、次のように CSV ファイルをダウンロードし、ボリュームに保存します。

import os

os.environ["UNITY_CATALOG_VOLUME_PATH"] = "/Volumes/<catalog-name>/<schema-name>/<volume-name>/"
os.environ["DATASET_DOWNLOAD_URL"] = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
os.environ["DATASET_DOWNLOAD_FILENAME"] = "rows.csv"

dbutils.fs.cp(f"{os.environ.get('DATASET_DOWNLOAD_URL')}", f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}")

<catalog-name><schema-name><volume-name> を、Unity Catalog ボリュームのカタログ、スキーマ、ボリュームの名前に置き換えます。

オブジェクト ストレージ内のファイルからテーブルを作成する

Delta Live Tables は、Azure Databricks でサポートされているすべての形式からのデータの読み込みをサポートしています。 「データ形式のオプション」を参照してください。

@dlt.table デコレーターは、関数によって返される DataFrame の結果を含むテーブルを作成するように Delta Live Tables に指示します。 Spark データ フレームを返す Python 関数定義の前に @dlt.table デコレーターを追加して、Delta Live Tables に新しいテーブルを登録します。 テーブル名として関数名を使用し、テーブルにわかりやすいコメントを追加する方法を次の例に示します。

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = spark.read.csv(f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}", header=True, inferSchema=True)
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

パイプライン内のアップストリーム データセットからテーブルを追加する

dlt.read() を使用して、現在の Delta Live Tables パイプラインで宣言されている他のデータセットからデータを読み取ることができます。 この方法で新しいテーブルを宣言すると、更新を実行する前に Delta Live Tables が自動的に解決する依存関係が作成されます。 次のコードには、期待に応えたデータ品質の監視と適用の例も含まれています。 「Delta Live Tables を使用してデータ品質を管理する」を参照してください。

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    dlt.read("baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

エンリッチされたデータ ビューを含むテーブルを作成する

Delta Live Tables はパイプラインへの更新を一連の依存関係グラフとして処理するため、特定のビジネス ロジックを使用してテーブルを宣言することで、ダッシュボード、BI、分析を強化する高度にエンリッチされたビューを宣言できます。

Delta Live Tables のテーブルは、概念的には具体化されたビューと同等のものです。 Spark の従来のビューは、ビューのクエリが実行されるたびにロジックを実行しますが、Delta Live Tables のテーブルは最新バージョンのクエリ結果をデータ ファイルに保存します。 Delta Live Tables はパイプライン内のすべてのデータセットの更新を管理するため、具体化されたビューの待機時間の要件に合うようにパイプラインの更新をスケジュールすることで、これらのテーブルに対するクエリに使用可能な最新バージョンのデータが含まれるようにできます。

次のコードで定義されるテーブルは、パイプライン内のアップストリーム データから派生した具体化されたビューと概念的に類似します。

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    dlt.read("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

ノートブックを使うパイプラインを構成するには、「パイプラインを作成する」をご覧ください。

SQL を使用して Delta Live Tables パイプラインを実装する

Databricks では、SQL ユーザーが Azure Databricks 上で新しい ETL、インジェスト、変換パイプラインを構築する方法として、Delta Live Tables with SQL が推奨されます。 Delta Live Tables の SQL インターフェイスでは、多くの新しいキーワード、コンストラクト、テーブル値関数で、標準の Spark SQL が拡張されています。 標準 SQL に追加されたこれらの機能により、ユーザーはデータセット間の依存関係を宣言でき、新しいツールや追加の概念を学習しなくても、運用環境グレードのインフラストラクチャをデプロイできます。

Spark DataFrame を熟知しているユーザーや、メタプログラミング操作のような、SQL では実装が困難な、より広範なテストと操作のサポートを必要とするユーザーには、Python インターフェイスを使うことをお勧めします。 「例: ニューヨークの赤ちゃんの名前のデータを取り込んで処理する」をご覧ください。

データをダウンロードする

この例のデータを取得するには、次のコードをコピーし、新しいノートブックに貼り付けてからノートブックを実行します。 ノートブックを作成するためのオプションを確認するには、「ノートブックを作成する」を参照してください。

%sh
wget -O "/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv" "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"

<catalog-name><schema-name><volume-name> を、Unity Catalog ボリュームのカタログ、スキーマ、ボリュームの名前に置き換えます。

Unity Catalog のファイルからテーブルを作成する

この例の残りの部分では、次の SQL スニペットをコピーし、前のセクションのノートブックとは別の新しい SQL ノートブックに貼り付けます。 各サンプル SQL スニペットは、説明されている順序でノートブック内の独自のセルに追加する必要があります。

Delta Live Tables は、Azure Databricks でサポートされているすべての形式からのデータの読み込みをサポートしています。 「データ形式のオプション」を参照してください。

すべての Delta Live Tables SQL ステートメントでは、CREATE OR REFRESH 構文とセマンティクスが使用されます。 パイプラインを更新すると、Delta Live Tables では、テーブルの論理的に正しい結果が増分処理で達成できるか、完全な再計算が必要かが判断されます。

次の例では、Unity Catalog ボリュームに格納されている CSV ファイルからデータを読み込んでテーブルを作成します。

CREATE OR REFRESH LIVE TABLE baby_names_sql_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count FROM read_files(
  '/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST')

<catalog-name><schema-name><volume-name> を、Unity Catalog ボリュームのカタログ、スキーマ、ボリュームの名前に置き換えます。

パイプライン内のアップストリーム データセットからテーブルを追加する

live 仮想スキーマを使用して、現在の Delta Live Tables パイプラインで宣言されている他のデータセットからデータをクエリに登録することができます。 この方法で新しいテーブルを宣言すると、更新を実行する前に Delta Live Tables が自動的に解決する依存関係が作成されます。 live スキーマは Delta Live Tables に実装されたカスタム キーワードであり、データセットを公開する場合にターゲット スキーマの代わりに使用できます。 「Unity Catalog を Delta Live Tables パイプラインで使う」または Delta Live Tables から Hive メタストアへのデータの発行に関する記事を参照してください。

次のコードには、期待に応えたデータ品質の監視と適用の例も含まれています。 「Delta Live Tables を使用してデータ品質を管理する」を参照してください。

CREATE OR REFRESH LIVE TABLE baby_names_sql_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM live.baby_names_sql_raw;

エンリッチされたデータ ビューを作成する

Delta Live Tables はパイプラインへの更新を一連の依存関係グラフとして処理するため、特定のビジネス ロジックを使用してテーブルを宣言することで、ダッシュボード、BI、分析を強化する高度にエンリッチされたビューを宣言できます。

ライブ テーブルは、具体化されたビューと概念的に同等です。 Spark の従来のビューは、ビューのクエリが実行されるたびにロジックを実行しますが、ライブ テーブルは最新バージョンのクエリ結果をデータ ファイルに保存します。 Delta Live Tables はパイプライン内のすべてのデータセットの更新を管理するため、具体化されたビューの待機時間の要件に合うようにパイプラインの更新をスケジュールすることで、これらのテーブルに対するクエリに使用可能な最新バージョンのデータが含まれるようにできます。

次のコードでは、アップストリーム データのエンリッチ済みの具体化されたビューを作成します。

CREATE OR REFRESH LIVE TABLE top_baby_names_sql_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM live.baby_names_sql_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

ノートブックを使うパイプラインを構成するには、続けて「パイプラインを作成する」をご覧ください。

パイプラインを作成する

Note

サーバーレス DLT パイプライン (パブリック プレビュー) の場合、コンピューティング リソースはフル マネージドであるため、パイプラインに対して [サーバーレス] を選択した場合、コンピューティング設定は使用できません。

サーバーレス DLT パイプラインを有効にする方法については、Azure Databricks アカウント チームにお問い合わせください。

Delta Live Tables では、Delta Live Tables 構文を使用して、ノートブックまたはファイル ("ソース コード" または "ライブラリ" と呼ばれます) で定義されている依存関係を解決すると、パイプラインが作成されます。 各ソース コード ファイルに含めることができる言語は 1 つだけですが、パイプラインではさまざまな言語のライブラリを混在させることができます。

  1. サイドバーの [デルタ ライブ テーブル] をクリックして [パイプラインの作成] をクリックします。
  2. パイプラインに名前を付けます。
  3. (省略可能) サーバーレス DLT パイプラインを使用してパイプラインを実行するには、[サーバーレス] チェック ボックスをオンにします。 [サーバーレス] を選択すると、[コンピューティング] 設定が UI から削除されます。 「サーバーレス コンピューティングで Delta Live Tables を使用してフル マネージド パイプラインを作成する」を参照してください。
  4. (省略可能) [製品エディション] を選択します。
  5. [パイプライン モード][トリガー] を選択します。
  6. パイプラインのソース コードを含む 1 つ以上のノートブックを構成します。 [パス] テキストボックスで、ノートブックへのパスを入力するか、ファイル ピッカー アイコン をクリックしてノートブックを選択します。
  7. パイプラインによって発行されたデータセット (Hive メタストアまたは Unity カタログ) の宛先を選択します。 「データセットを公開する」を参照してください。
    • Hive メタストア:
      • (省略可能) 必要に応じて、パイプラインからの出力データの保存場所を入力してください。 [保存場所] を空のままにすると、既定の場所が使用されます。
      • (省略可能) データセットを Hive メタストアに発行するには、ターゲット スキーマを指定します。
    • Unity Catalog: データセットを Unity Catalog に発行するための [カタログ][ターゲット スキーマ] を指定します。
  8. (省略可能) [サーバーレス] を選択していない場合は、パイプラインのコンピューティング設定を構成できます。 コンピューティング設定のオプションについては、「Delta Live Tables のパイプライン設定を構成する」を参照してください。
  9. (省略可能) [通知の追加] をクリックして、パイプライン イベントの通知を受信する 1 つ以上のメール アドレスを構成してください。 「パイプライン イベントの通知を追加する」を参照してください。
  10. (省略可能) パイプラインの詳細設定を構成します。 詳細設定のオプションについては、「Delta Live Tables のパイプライン設定を構成する」を参照してください。
  11. Create をクリックしてください。

[作成] をクリックすると、[パイプラインの詳細] ページが表示されます。 [Delta Live Tables] タブでパイプライン名をクリックしてパイプラインにアクセスすることもできます。

パイプラインの更新を開始する

新しいパイプラインの更新プログラムを開始するには、上部パネルで Delta Live Tables の開始アイコン ボタンをクリックしてください。 パイプラインが開始されたことを伝えるメッセージがシステムから返されます。

更新が正常に開始されると、Delta Live Tables システムでは次のことが行われます。

  1. Delta Live Tables システムによって作成されたクラスター構成を使用してクラスターを起動します。 カスタムのクラスター構成を指定することもできます。
  2. 存在しないテーブルを作成し、スキーマが既存のテーブルに対して正しいことを確認します。
  3. 使用可能な最新のデータを使用してテーブルを更新します。
  4. 更新が完了したら、クラスターをシャットダウンします。

Note

実行モードは既定で [実稼働] に設定されており、更新ごとにエフェメラル コンピューティング リソースがデプロイされます。 [開発] モードを使用してこの動作を変更し、開発およびテスト中に同じコンピューティング リソースを複数のパイプラインの更新に使用することができます。 「開発と運用のモード」を参照してください。

データセットを公開する

Hive メタストアまたは Unity Catalog にテーブルを発行すると、Delta Live Tables データセットがクエリに使用できるようになります。 データを発行するターゲットを指定しない場合、Delta Live Tables パイプラインで作成されたテーブルには、同じパイプラインの他の操作によってのみアクセスできます。 Delta Live Tables から Hive メタストアへのデータの発行に関する記事、または「Unity Catalog を Delta Live Tables パイプラインで使う」を参照してください。

ソース コード ノートブックの例

これらのノートブックを Azure Databricks ワークスペースにインポートし、それらを使って Delta Live Tables パイプラインをデプロイできます。 「パイプラインを作成する」を参照してください。

Delta Live Tables Python ノートブックで作業を開始する

ノートブックを入手

Delta Live Tables SQL ノートブックで作業を開始する

ノートブックを入手

Unity Catalog を使用しないワークスペース用のソース コード ノートブックの例

これらのノートブックを、Unity Catalog が有効になっていない Azure Databricks ワークスペースにインポートし、それらを使って Delta Live Tables パイプラインをデプロイできます。 「パイプラインを作成する」を参照してください。

Delta Live Tables Python ノートブックで作業を開始する

ノートブックを入手

Delta Live Tables SQL ノートブックで作業を開始する

ノートブックを入手