Delta Live Tables を使用してデータを変換する

この記事では、Delta Live Tables を使用してデータセットで変換を宣言し、クエリ ロジックを利用したレコードの処理方法を指定する方法について説明します。 また、Delta Live Tables パイプラインを構築する際に役立つ一般的な変換パターンの例もいくつか紹介しています。

DataFrame を返す任意のクエリに対してデータセットを定義できます。 Delta Live Tables パイプラインでは、Apache Spark の組み込み操作、UDF、カスタム ロジック、MLflow モデルを変換として使用できます。 Delta Live Tables パイプラインにデータが取り込まれたら、アップストリーム ソースに対して新しいデータセットを定義し、新しいストリーミング テーブル、具体化されたビュー、ビューを作成することができます。

Delta Live Tables でステートフル処理を効果的に実行する方法については、「Delta Live Tables でウォーターマークを使ってステートフル処理を最適化する」を参照してください。

ビュー、具体化されたビュー、ストリーミング テーブルを使用する場合

パイプラインを効率的かつ保守可能にするために、パイプライン クエリを実装する際には、最適なデータセット タイプを選択してください。

次の場合は、ビューの使用を検討してください。

  • 大規模または複雑なクエリを、より管理しやすいクエリに分割したい場合。
  • 期待値を使って中間結果を検証したい。
  • ストレージとコンピューティングのコストを削減したい。クエリ結果の実体化は必要としない。 テーブルは具体化されているため、追加の計算とストレージ リソースが必要です。

次の場合は、具体化されたビューの使用を検討してください。

  • 複数のダウンストリーム クエリがテーブルを消費する。 ビューはオンデマンドで計算されるため、ビューをクエリするたびにビューは再計算されます。
  • その他のパイプライン、ジョブ、クエリでテーブルを使用する。 ビューは具体化されていないため、同じパイプラインの中でしか使用できません。
  • 開発中にクエリの結果を見たい場合。 テーブルは具体化されており、閲覧や照会をパイプラインの外部でできるため、開発中にテーブルを使用することで計算の正しさを検証するのに役立ちます。 検証後、具体化を必要としないクエリをビューに変換します。

次の場合は、ストリーミング テーブルの使用を検討してください。

  • クエリが、継続的またはインクリメンタルに増加しているデータ ソースに対して定義されている。
  • クエリ結果はインクリメンタルに計算する必要がある。
  • パイプラインに高いスループットと低遅延が求められる。

Note

ストリーミング テーブルは、常にストリーミング ソースに対して定義されます。 APPLY CHANGES INTO でストリーミング ソースを使用して、CDC フィードから更新を適用することもできます。 「APPLY CHANGES API: Delta Live Tables の変更データ キャプチャを簡略化する」を参照してください。

ストリーミング テーブルと具体化されたビューを 1 つのパイプラインに結合する

ストリーミング テーブルは、Apache Spark 構造化ストリーミングの処理の保証を継承し、追加専用のデータ ソースからのクエリを処理するように構成されています。ここでは、新しい行は変更されるのではなく、常にソース テーブルに挿入されます。

Note

ストリーミング テーブルには既定では追加専用のデータ ソースが必要とされますが、更新や削除が発生するような別のストリーミング テーブルをストリーミング ソースにする場合は、その動作をskipChangeCommits フラグでオーバーライドできます

一般的なストリーミング パターンには、パイプラインで初期データセットを作成するためのソース データの取り込みが含まれます。 これらの初期データセットは、一般的にブロンズ テーブルと呼ばれ、多くの場合、単純な変換を実行します。

これに対し、パイプライン内の最終的なテーブル (一般的にゴールド テーブルと呼ばれます) では、多くの場合、複雑な集計、または APPLY CHANGES INTO 操作の対象であるソースからの読み取りが必要になります。 これらの操作は本質的に追加ではなく更新を作成するため、ストリーミング テーブルへの入力としてサポートされていません。 これらの変換は、具体化されたビューに適しています。

ストリーミング テーブルと具体化されたビューを 1 つのパイプラインに組み合わせると、パイプラインを簡素化し、コストがかかる生データの再取り込みまたは再処理を避け、SQL の全機能を活用して効率的にエンコードおよびフィルター処理されたデータセットに対して複雑な集計を計算することができます。 次の例は、この種類の混合処理を示しています。

Note

これらの例では、自動ローダーを使用してクラウド ストレージからファイルを読み込みます。 Unity Catalog が有効になったパイプラインで自動ローダーを使用してファイルを読み込むには、外部の場所を使用する必要があります。 Delta Live Tables での Unity Catalog の使用の詳細については、「Delta Live Tables パイプラインでの Unity Catalog の使用」を参照してください。

Python

@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("abfss://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return dlt.read_stream("streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return dlt.read("streaming_silver").groupBy("user_id").count()

SQL

CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM cloud_files(
  "abfss://path/to/raw/data", "json"
)

CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...

CREATE OR REFRESH LIVE TABLE live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id

自動ローダーを使用して、増分処理のために Azure Storage から JSON ファイルを効率的に読み取る方法について説明します。

ストリーム静的結合

ストリーム静的結合は、主に静的なディメンション テーブルを使用して追加専用のデータの連続したストリームを非正規化する場合に最適な選択肢です。

パイプラインが更新されるたびに、ストリームからの新しいレコードが静的テーブルの最新のスナップショットと結合されます。 ストリーミング テーブルからの対応するデータが処理された後に、レコードが静的テーブルで追加または更新された場合は、完全な更新が実行されない限り、結果のレコードは再計算されません。

トリガーされた実行に対して構成されたパイプラインでは、静的テーブルは更新が開始された時点での結果を返します。 連続実行に対して構成されたパイプラインでは、テーブルで更新を処理するたびに、最新バージョンの静的テーブルに対してクエリが実行されます。

ストリーム静的結合の例を以下に示します。

Python

@dlt.table
def customer_sales():
  return dlt.read_stream("sales").join(dlt.read("customers"), ["customer_id"], "left")

SQL

CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
  INNER JOIN LEFT LIVE.customers USING (customer_id)

集計を効率的に計算する

カウント、最小、最大、合計などの単純な分散集計や、平均または標準偏差などの代数集計も、ストリーミング テーブルを使用してインクリメンタルに計算できます。 Databricks では、グループの数が限られているクエリ (GROUP BY country 句を含むクエリなど) には増分集計をお勧めします。 更新ごとに新しい入力データだけが読み取られます。

増分集計を実行する Delta Live Tables クエリの作成の詳細については、ウォーターマークを使用したウィンドウ集計の実行に関する記事を参照してください。

Delta Live Tables パイプラインで MLFlow モデルを使用する

Note

Unity Catalog 対応パイプラインで MLflow モデルを使うには、preview チャネルを使うようにパイプラインを構成する必要があります。 current チャネルを使うには、Hive メタストアに公開するようにパイプラインを構成する必要があります。

Delta Live Tables パイプラインでは、MLflow でトレーニング済みのモデルを使用できます。 MLflow モデルは Azure Databricks で変換として扱われます。つまり、Spark DataFrame の入力に基づいて動作し、結果を Spark DataFrame として返します。 Delta Live Tables では DataFrames に対してデータセットを定義するため、わずか数行のコードで、MLflow を活用した Apache Spark ワークロードを Delta Live Tables に変換することができます。 MLflow の詳細については、「MLflow を使用した機械学習ライフサイクル管理」を参照してください。

MLflow モデルを呼び出す Python ノートブックが既にある場合は、@dlt.table デコレーターを使用して、変換結果を返すように関数が定義されていることを確認すると、このコードを Delta Live Tables に適合させることができます。 Delta Live Tables では既定では MLflow がインストールされないため、ノートブックの上部で %pip install mlflow というコマンドを追加し、mlflowdlt をインポートしてください。 Delta Live Tables 構文の概要については、「例: ニューヨークの赤ちゃんの名前のデータを取り込んで処理する」を参照してください。

Delta Live Tables で MLflow モデルを使用するには、次の手順を実行します。

  1. MLflow モデルの実行 ID とモデル名を取得します。 実行 ID とモデル名は、MLflow モデルの URI の構築に使用されます。
  2. この URI を使用して、MLflow モデルを読み込む Spark UDF を定義します。
  3. テーブル定義で UDF を呼び出して、MLflow モデルを使用します。

次の例は、このパターンの基本的な構文を示しています。

%pip install mlflow

import dlt
import mlflow

run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

@dlt.table
def model_predictions():
  return dlt.read(<input-data>)
    .withColumn("prediction", loaded_model_udf(<model-features>))

完全な例として、次のコードでは、ローン リスク データでトレーニングされた MLflow モデルを読み込む loaded_model_udf という名前の Spark UDF を定義します。 予測を行うために使用するデータ列は、引数として UDF に渡されます。 テーブル loan_risk_predictions では、loan_risk_input_data の各行の予測を計算します。

%pip install mlflow

import dlt
import mlflow
from pyspark.sql.functions import struct

run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)

categoricals = ["term", "home_ownership", "purpose",
  "addr_state","verification_status","application_type"]

numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
  "revol_util", "total_acc", "credit_length_in_years"]

features = categoricals + numerics

@dlt.table(
  comment="GBT ML predictions of loan risk",
  table_properties={
    "quality": "gold"
  }
)
def loan_risk_predictions():
  return dlt.read("loan_risk_input_data")
    .withColumn('predictions', loaded_model_udf(struct(features)))

手動での削除または更新の保持

Delta Live Tables を使用すると、テーブルからレコードを手動で削除または更新し、更新操作を実行してダウンストリーム テーブルを再計算することができます。

既定では、パイプラインが更新されるたびに Delta Live Tables で入力データに基づいてテーブルの結果が再計算されるため、削除されたレコードがソース データから再度読み込まれないようにする必要があります。 pipelines.reset.allowed テーブル プロパティを false に設定すると、テーブルに対する更新が抑止されますが、テーブルへのインクリメンタルな書き込みが抑制されたり、新しいデータのテーブルへの流入が抑止されたりすることはありません。

次の図は、2 つのストリーミング テーブルを使用した例を示しています。

  • raw_user_table でソースから生のユーザー データを取り込む。
  • bmi_table は、raw_user_table の体重と身長を使用して BMI スコアをインクリメンタルに計算する。

raw_user_table からユーザー レコードを手動で削除または更新し、bmi_table を再計算する必要があります。

データ保持の図

次のコードでは、pipelines.reset.allowed テーブル プロパティを false に設定し、raw_user_table の完全更新を無効にして、意図した変更が経時的に維持されるようにしますが、パイプラインの更新の実行時にダウンストリーム テーブルが再計算されることを示しています。

CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM cloud_files("/databricks-datasets/iot-stream/data-user", "csv");

CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);