次の方法で共有


パイプラインの期待に応えたデータ品質の管理

ETL パイプラインを通過するデータを検証する品質制約を適用するには、期待値を使用します。 期待は、データ品質メトリックに関するより大きな洞察を提供し、無効なレコードを検出するときに更新を失敗させたり、レコードを削除したりすることができます。

この記事では、構文の例や動作オプションなど、期待される機能の概要について説明します。 より高度なユース ケースと推奨されるベスト プラクティスについては、「 期待される推奨事項と高度なパターン」を参照してください。

Lakeflow 宣言型パイプラインの期待されるフロー グラフ

期待値とは何ですか?

クエリを通過する各レコードにデータ品質チェックを適用するパイプライン具体化ビュー、ストリーミングテーブル、またはビュー作成ステートメントにおいて、期待値は任意の句として使用されます。 期待値では、標準の SQL ブールステートメントを使用して制約を指定します。 1 つのデータセットに対する複数の期待値を組み合わせて、パイプライン内のすべてのデータセット宣言に期待値を設定できます。

次のセクションでは、期待される 3 つのコンポーネントについて説明し、構文の例を示します。

期待される名前

各期待値には名前が必要です。これは、予測を追跡および監視するための識別子として使用されます。 検証対象のメトリックを伝える名前を選択します。 次の例では、年齢が 0 ~ 120 歳であることを確認する valid_customer_age を定義します。

重要

期待値の名前は、特定のデータセットに対して一意である必要があります。 パイプライン内の複数のデータセット間で期待値を再利用できます。 ポータブルで再利用可能な期待を参照してください。

Python(プログラミング言語)

@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
  return spark.readStream.table("datasets.samples.raw_customers")

SQL

CREATE OR REFRESH STREAMING TABLE customers(
  CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);

評価する制約

制約句は、各レコードで true または false と評価する必要がある SQL 条件ステートメントです。 制約には、検証対象の実際のロジックが含まれています。 レコードがこの条件に失敗すると、期待値がトリガーされます。

制約では有効な SQL 構文を使用する必要があり、以下を含めることはできません。

  • カスタム Python 関数
  • 外部サービス呼び出し
  • 他のテーブルを参照するサブクエリ

データセット作成ステートメントに追加できる制約の例を次に示します。

Python(プログラミング言語)

Python の制約の構文は次のとおりです。

@dlt.expect(<constraint-name>, <constraint-clause>)

複数の制約を指定できます。

@dlt.expect(<constraint-name>, <constraint-clause>)
@dlt.expect(<constraint2-name>, <constraint2-clause>)

例:

# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")

# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")

# CASE statements
@dlt.expect("valid_order_status", """
   CASE
     WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
     WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
     ELSE false
   END
""")

# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")

# Complex business logic
@dlt.expect(
  "valid_subscription_dates",
  """start_date <= end_date
    AND end_date <= current_date()
    AND start_date >= '2020-01-01'"""
)

# Complex boolean logic
@dlt.expect("valid_order_state", """
   (status = 'ACTIVE' AND balance > 0)
   OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")

SQL

SQL の制約の構文は次のとおりです。

CONSTRAINT <constraint-name> EXPECT ( <constraint-clause> )

複数の制約はコンマで区切る必要があります。

CONSTRAINT <constraint-name> EXPECT ( <constraint-clause> ),
CONSTRAINT <constraint2-name> EXPECT ( <constraint2-clause> )

例:

-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)

-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)

-- CASE statements
CONSTRAINT valid_order_status EXPECT (
  CASE
    WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
    WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
    ELSE false
  END
)

-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0),
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())

-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
  start_date <= end_date
  AND end_date <= current_date()
  AND start_date >= '2020-01-01'
)

-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
  (status = 'ACTIVE' AND balance > 0)
  OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)

無効なレコードに対するアクション

レコードが検証チェックに失敗した場合の動作を決定するアクションを指定する必要があります。 次の表では、使用可能なアクションについて説明します。

アクション SQL 構文 Python 構文 結果
warn (既定値) EXPECT dlt.expect 無効なレコードがターゲットに書き込まれています。
ドロップ EXPECT ... ON VIOLATION DROP ROW dlt.expect_or_drop 無効なレコードは、データがターゲットに書き込まれる前に削除されます。 削除されたレコードの数は、他のデータセット メトリックと共にログに記録されます。
失敗する EXPECT ... ON VIOLATION FAIL UPDATE dlt.expect_or_fail 無効なレコードでは、更新が成功できません。 再処理する前に手動による介入が必要です。 この期待により、単一のフローの障害が発生し、パイプライン内の他のフローが失敗することはありません。

また、データの損失や削除を避けつつ無効なレコードを検疫するための高度なロジックを実装することもできます。 「無効なレコードの検疫」を参照してください。

想定値追跡メトリック

パイプライン UI から、 warn または drop アクションの追跡メトリックを確認できます。 無効なレコードが検出されると、 fail によって更新が失敗するため、メトリックは記録されません。

期待されるメトリックを表示するには、次の手順を実行します。

  1. Azure Databricks ワークスペースのサイドバーで、ジョブ & パイプライン をクリックします。
  2. パイプラインの 名前 をクリックします。
  3. 期待値が定義されているデータセットをクリックします。
  4. 右側のサイドバーで [ データ品質 ] タブを選択します。

Lakeflow 宣言パイプライン イベント ログに対してクエリを実行すると、データ品質メトリックを表示できます。 イベント ログからのデータ品質のクエリを参照してください。

無効なレコードを保持する

無効なレコードを保持することは、期待される既定の動作です。 expect演算子は、期待に違反するレコードを保持し、制約に合格または失敗したレコードの数に関するメトリックを収集する場合に使用します。 期待に違反するレコードは、有効なレコードと共にターゲット データセットに追加されます。

Python(プログラミング言語)

@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

無効なレコードを削除する

expect_or_drop演算子を使用して、無効なレコードをさらに処理しないようにします。 期待に違反するレコードは、ターゲット データセットから削除されます。

Python(プログラミング言語)

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

無効なレコードで失敗する

無効なレコードが許容できない場合は、 expect_or_fail 演算子を使用して、レコードの検証に失敗したときにすぐに実行を停止します。 操作がテーブルの更新である場合、システムはトランザクションをアトミックにロールバックします。

Python(プログラミング言語)

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

重要

パイプラインに複数の並列フローが定義されている場合、1 つのフローで障害が発生しても、他のフローは失敗しません。

Lakeflow 宣言型パイプラインのフローエラーの説明グラフ

想定値からの更新で失敗した場合のトラブルシューティング

予想違反が原因でパイプラインが失敗した場合は、パイプラインを再実行する前に、無効なデータを正しく処理するようにパイプライン コードを修正する必要があります。

パイプラインが失敗するように構成された期待値は、違反を検出して報告するために必要な情報を追跡するように変換の Spark クエリ プランを変更します。 この情報を使用して、多くのクエリで違反が発生した入力レコードを特定できます。 Lakeflow 宣言パイプラインには、このような違反を報告するための専用のエラー メッセージが用意されています。 期待違反のエラー メッセージの例を次に示します。

[EXPECTATION_VIOLATION.VERBOSITY_ALL] Flow 'sensor-pipeline' failed to meet the expectation. Violated expectations: 'temperature_in_valid_range'. Input data: '{"id":"TEMP_001","temperature":-500,"timestamp_ms":"1710498600"}'. Output record: '{"sensor_id":"TEMP_001","temperature":-500,"change_time":"2024-03-15 10:30:00"}'. Missing input data: false

期待を複数管理する

SQL と Python の両方が 1 つのデータセットで複数の期待をサポートしていますが、複数の期待値をグループ化し、集合的なアクションを指定できるのは Python だけです。

Lakeflow 宣言型パイプラインと複数の期待を持つフローグラフ

複数の期待値をグループ化し、関数 expect_allexpect_all_or_drop、および expect_all_or_failを使用して集合アクションを指定できます。

これらのデコレーターは、Python ディクショナリを引数として受け入れます。ここで、キーは期待値の名前であり、値は期待される制約です。 パイプライン内の複数のデータセットで同じ一連の期待値を再利用できます。 次に、 expect_all Python の各演算子の例を示します。

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create a raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create a cleaned and prepared dataset

@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
  # Create cleaned and prepared to share the dataset

制限事項

  • ストリーミング テーブルと具体化されたビューのみが期待値をサポートするため、データ品質メトリックはこれらのオブジェクトの種類に対してのみサポートされます。
  • 次の場合、データ品質メトリックは使用できません。
    • クエリに対する期待値は定義されていません。
    • フローでは、期待値をサポートしていない演算子を使用します。
    • Lakeflow 宣言型パイプライン シンクなどのフローの種類は、期待値をサポートしていません。
    • 特定のフロー実行について、関連付けられているストリーミング テーブルまたは具体化されたビューに対する更新はありません。
    • パイプライン構成には、 pipelines.metrics.flowTimeReporter.enabledなどのメトリックをキャプチャするために必要な設定は含まれません。
  • 場合によっては、 COMPLETED フローにメトリックが含まれていない場合があります。 代わりに、メトリックは、flow_progressの状態を持つRUNNINGイベント内の各マイクロバッチで報告されます。