次の方法で共有


AUTO CDC API: Lakeflow 宣言パイプラインを使用して変更データ キャプチャを簡略化する

Lakeflow 宣言パイプラインは、 AUTO CDC API と AUTO CDC FROM SNAPSHOT API を使用して変更データ キャプチャ (CDC) を簡略化します。

AUTO CDC API は、以前は APPLY CHANGES と呼ばれ、同じ構文を持っていました。

使用するインターフェイスは、変更データのソースによって異なります。

  • 変更データ フィード (CDF) からの変更を処理するには、 AUTO CDC を使用します。
  • AUTO CDC FROM SNAPSHOT (パブリック プレビュー、および Python でのみ使用可能) を使用して、データベース スナップショットの変更を処理します。

以前は、 MERGE INTO ステートメントは、Azure Databricks で CDC レコードを処理するために一般的に使用されていました。 ただし、 MERGE INTO は、レコードの順序が正しくないために正しくない結果が生成される場合や、レコードの順序を再作成するための複雑なロジックが必要になる場合があります。

AUTO CDC API は、Lakeflow 宣言パイプラインの SQL および Python インターフェイスでサポートされています。 AUTO CDC FROM SNAPSHOT API は、Lakeflow 宣言パイプライン Python インターフェイスでサポートされています。

AUTO CDCAUTO CDC FROM SNAPSHOTの両方で、SCD タイプ 1 とタイプ 2 を使用したテーブルの更新がサポートされます。

  • SCD タイプ 1 を使用して、レコードを直接更新します。 更新されたレコードの履歴は保持されません。
  • SCD タイプ 2 を使用して、すべての更新または指定された列セットの更新時にレコードの履歴を保持します。

構文などの参照については、Lakeflow 宣言型パイプライン SQL の AUTO CDCLakeflow 宣言型パイプライン Python の AUTO CDC、およびLakeflow 宣言型パイプライン Python の AUTO CDC FROM SNAPSHOTを参照してください。

この記事では、ソース データの変更に基づいて、Lakeflow 宣言パイプラインのテーブルを更新する方法について説明します。 Delta テーブルの行レベルの変更情報を記録およびクエリする方法については、「 Azure Databricks での Delta Lake 変更データ フィードの使用」を参照してください。

必要条件

CDC API を使用するには、パイプラインを serverlessLakeflow 宣言型パイプラインLakeflow 宣言型パイプライン、または Advancedエディション のいずれかを使用するように構成する必要があります。

CDC は AUTO CDC API でどのように実装されますか?

Lakeflow 宣言パイプラインの AUTO CDC API は、シーケンス外のレコードを自動的に処理することで、CDC レコードの正しい処理を保証し、シーケンス外レコードを処理するための複雑なロジックを開発する必要がなくなります。 ソース データ内でレコードをシーケンスする列を指定する必要があります。この列は、Lakeflow 宣言パイプラインがソース データの適切な順序を単調に増加させる表現として解釈します。 Lakeflow 宣言型パイプラインは、順不同で到着するデータを自動的に処理します。 SCD タイプ 2 の変更の場合、Lakeflow 宣言パイプラインは、適切なシーケンス値をターゲット テーブルの __START_AT 列と __END_AT 列に伝達します。 各シーケンス値にはキーごとに 1 つの個別の更新が必要であり、NULL シーケンス値はサポートされていません。

AUTO CDCを使用して CDC 処理を実行するには、まずストリーミング テーブルを作成し、次に SQL の AUTO CDC ... INTO ステートメントまたは Python の create_auto_cdc_flow() 関数を使用して、変更フィードのソース、キー、シーケンスを指定します。 ターゲット ストリーミング テーブルを作成するには、SQL の CREATE OR REFRESH STREAMING TABLE ステートメントまたは Python の create_streaming_table() 関数を使用します。 SCD タイプ 1 およびタイプ 2 の処理例を参照してください。

構文の詳細については、Lakeflow 宣言型パイプライン の SQL リファレンス または Python リファレンスを参照してください

API を使用して CDC を実装する方法

重要

AUTO CDC FROM SNAPSHOT API はパブリック プレビュー段階です。

AUTO CDC FROM SNAPSHOT は、一連のインオーダー スナップショットを比較してソース データの変更を効率的に判断し、スナップショット内のレコードの CDC 処理に必要な処理を実行する宣言型 API です。 AUTO CDC FROM SNAPSHOT は、Lakeflow 宣言型パイプライン Python インターフェイスでのみサポートされています。

AUTO CDC FROM SNAPSHOT では、複数のソースの種類からのスナップショットの取り込みがサポートされています。

  • 定期的なスナップショット インジェストを使用して、既存のテーブルまたはビューからスナップショットを取り込みます。 AUTO CDC FROM SNAPSHOT には、既存のデータベース オブジェクトからのスナップショットの定期的な取り込みをサポートするシンプルで合理化されたインターフェイスがあります。 パイプラインの更新ごとに新しいスナップショットが取り込まれており、取り込み時間がスナップショット バージョンとして使用されます。 パイプラインが連続モードで実行されると、処理を含むフローのAUTO CDC FROM SNAPSHOT設定によって決定された期間に、パイプラインの更新ごとに複数のスナップショットが取り込まれます。
  • 履歴スナップショット インジェストを使用して、Oracle または MySQL データベースまたはデータ ウェアハウスから生成されたスナップショットなど、データベース スナップショットを含むファイルを処理します。

AUTO CDC FROM SNAPSHOTを使用して任意のソースの種類から CDC 処理を実行するには、まずストリーミング テーブルを作成してから、Python で create_auto_cdc_from_snapshot_flow() 関数を使用して、処理を実装するために必要なスナップショット、キー、およびその他の引数を指定します。 定期的なスナップショット インジェストスナップショット インジェストの履歴の例を参照してください。

API に渡されるスナップショットは、バージョン別の昇順である必要があります。 Lakeflow 宣言型パイプラインで順序が不正なスナップショットが検出されると、エラーが発生します。

構文の詳細については、Lakeflow 宣言型パイプライン の Python リファレンスを参照してください

シーケンス処理に複数の列を使用する

複数の列を順番に並べ替えること (例: タイムスタンプと ID を使用した順序付け) ができます。STRUCT を使用するとそれらを組み合わせられます。最初に STRUCT の最初のフィールドで順序付けを開始し、同じ順番が表れた場合は 2 番目のフィールドを考慮します。

SQL の例:

SEQUENCE BY STRUCT(timestamp_col, id_col)

Python の例:

sequence_by = struct("timestamp_col", "id_col")

制限事項

シーケンス処理に使用する列は、並べ替え可能なデータ型である必要があります。

例: CDF ソース・データを使用した SCD タイプ 1 および SCD タイプ 2 の処理

次のセクションでは、変更データ フィードからのソース イベントに基づいてターゲット テーブルを更新する Lakeflow 宣言型パイプライン SCD タイプ 1 およびタイプ 2 のクエリの例を示します。

  1. 新しいユーザー レコードを作成します。
  2. ユーザー レコードを削除します。
  3. ユーザー レコードを更新します。 SCD タイプ 1 の例では、最後の UPDATE 操作は遅れて到着し、ターゲット テーブルから削除され、順序が不順のイベントの処理を示しています。

次の例では、Lakeflow 宣言パイプラインの構成と更新に関する知識を前提としています。 「チュートリアル: Lakeflow 宣言型パイプラインを使用して変更データ キャプチャを使用して ETL パイプラインを構築する」を参照してください。

これらの例を実行するには、まずサンプル データセットを作成する必要があります。 「テスト データの生成」を参照してください。

これらの例の入力レコードを次に示します。

ユーザーID 名前 都市 運営 シーケンス番号
124 ラウル オアハカ州 INSERT 1
123 イザベル モンテレー INSERT 1
125 メルセデス ティファナ INSERT 2
126 リリー カンクン INSERT 2
123 無効 無効 削除 6
125 メルセデス グアダラハラ UPDATE 6
125 メルセデス メヒカリ UPDATE 5
123 イザベル チワワ UPDATE 5

サンプル データの最後の行のコメントを解除すると、レコードを切り捨てる場所を指定する次のレコードが挿入されます。

ユーザーID 名前 都市 運営 シーケンス番号
無効 無効 無効 切り捨てる 3

次の例には、 DELETE 操作と TRUNCATE 操作の両方を指定するオプションが含まれていますが、それぞれは省略可能です。

SCD タイプ 1 の更新を処理する

次の例では、SCD タイプ 1 の更新の処理を示します。

Python(プログラミング言語)

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.create_auto_cdc_flow(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW flowname AS AUTO CDC INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

SCD タイプ 1 の例を実行した後、ターゲット テーブルには次のレコードが含まれます。

ユーザーID 名前 都市
124 ラウル オアハカ州
125 メルセデス グアダラハラ
126 リリー カンクン

追加のTRUNCATE レコードで SCD タイプ 1 の例を実行すると、124での126操作により、レコードTRUNCATEおよびsequenceNum=3が切り捨てられ、ターゲット テーブルに次のレコードが含まれます。

ユーザーID 名前 都市
125 メルセデス グアダラハラ

SCD タイプ 2 の更新を処理する

次の例は、SCD タイプ 2 の更新の処理を示しています。

Python(プログラミング言語)

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.create_auto_cdc_flow(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW target_flow
AS AUTO CDC INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

SCD タイプ 2 の例を実行した後、ターゲット テーブルには次のレコードが含まれます。

ユーザーID 名前 都市 __START_AT __END_AT
123 イザベル モンテレー 1 5
123 イザベル チワワ 5 6
124 ラウル オアハカ州 1 無効
125 メルセデス ティファナ 2 5
125 メルセデス メヒカリ 5 6
125 メルセデス グアダラハラ 6 無効
126 リリー カンクン 2 無効

SCD タイプ 2 クエリでは、ターゲット テーブルの履歴に対して追跡する出力列のサブセットを指定することもできます。 他の列への変更は、新しい履歴レコードを生成するのではなく、その場で更新されます。 次の例では、 city 列を追跡から除外する方法を示します。

次の例では、SCD タイプ 2 でトラック履歴を使用する方法を示します。

Python(プログラミング言語)

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.create_auto_cdc_flow(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW target_flow
AS AUTO CDC INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

追加の TRUNCATE レコードなしでこの例を実行した後、ターゲット テーブルには次のレコードが含まれます。

ユーザーID 名前 都市 __START_AT __END_AT
123 イザベル チワワ 1 6
124 ラウル オアハカ州 1 無効
125 メルセデス グアダラハラ 2 無効
126 リリー カンクン 2 無効

テスト データを生成する

次のコードは、このチュートリアルに存在するサンプル クエリで使用するサンプル データセットを生成するために提供されています。 新しいスキーマを作成して新しいテーブルを作成するための適切な資格情報があると仮定すると、ノートブックまたは Databricks SQL でこれらのステートメントを実行できます。 次のコードは、Lakeflow 宣言パイプラインの一部として実行することを意図 していません

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

例: 定期的なスナップショット処理

次の例では、 mycatalog.myschema.mytableに格納されているテーブルのスナップショットを取り込む SCD タイプ 2 の処理を示します。 処理の結果は、 targetという名前のテーブルに書き込まれます。

タイムスタンプ 2024-01-01 00:00:00 の mycatalog.myschema.mytable レコード

価値
1 a1
2 a2

mycatalog.myschema.mytable 2024年1月1日12時00分のタイムスタンプのレコード

価値
2 b2
3 a3
import dlt

@dlt.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.create_auto_cdc_from_snapshot_flow(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

スナップショットの処理後、ターゲット テーブルには次のレコードが含まれます。

価値 __START_AT __END_AT
1 a1 2024年1月1日 00時00分00秒 2024-01-01 12:00:00
2 a2 2024年1月1日 00時00分00秒 2024-01-01 12:00:00
2 b2 2024-01-01 12:00:00 無効
3 a3 2024-01-01 12:00:00 無効

例: スナップショット処理の履歴

次の例は、クラウド ストレージ システムに格納されている 2 つのスナップショットのソース イベントに基づいてターゲット テーブルを更新する SCD タイプ 2 の処理を示しています。

timestampに格納されているスナップショット/<PATH>/filename1.csv

トラッキングカラム 追跡しない列
1 a1 b1
2 a2 b2
4 a4 b4

timestamp + 5に格納されているスナップショット/<PATH>/filename2.csv

トラッキングカラム 追跡しない列
2 a2_new b2
3 a3 b3
4 a4 b4_new

次のコード例は、これらのスナップショットを使用した SCD タイプ 2 の更新の処理を示しています。

import dlt

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dlt.create_streaming_live_table("target")

dlt.create_auto_cdc_from_snapshot_flow(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

スナップショットの処理後、ターゲット テーブルには次のレコードが含まれます。

トラッキングカラム 追跡しない列 __START_AT __END_AT
1 a1 b1 1 2
2 a2 b2 1 2
2 a2_new b2 2 無効
3 a3 b3 2 無効
4 a4 b4_new 1 無効

ターゲット ストリーミング テーブル内のデータを追加、変更、または削除する

パイプラインでテーブルを Unity Catalog に発行する場合は、挿入、更新、削除、マージステートメントなどの データ操作言語 (DML) ステートメントを使用して、 AUTO CDC ... INTO ステートメントによって作成されたターゲット ストリーミング テーブルを変更できます。

  • ストリーミング テーブルのテーブル スキーマを変更する DML ステートメントはサポートされていません。 DML ステートメントがテーブル スキーマの進化を試みないことを確認します。
  • ストリーミング テーブルを更新する DML ステートメントは、Databricks Runtime 13.3 LTS 以降を使用する共有 Unity Catalog クラスターまたは SQL ウェアハウスでのみ実行できます。
  • ストリーミングには追加専用のデータソースが必要なため、処理で (DML ステートメントなどによる) 変更を伴うソース ストリーミング テーブルからのストリーミングが必要な場合は、ソース ストリーミング テーブルを読み取るときに skipChangeCommits フラグを設定します。 skipChangeCommits が設定されていれば、ソース テーブルのレコードを削除または変更するトランザクションは無視されます。 処理にストリーミング テーブルが必要ない場合は、具体化されたビュー (追加専用の制限がない) をターゲット テーブルとして使用できます。

Lakeflow 宣言パイプラインは、指定した SEQUENCE BY 列を使用し、ターゲット テーブルの __START_AT 列と __END_AT 列に適切なシーケンス値を伝達するため (SCD 型 2 の場合)、DML ステートメントでこれらの列の有効な値を使用して、レコードの適切な順序を維持する必要があります。 「AUTO CDC API を使用して CDC を実装する方法」を参照してください。

ストリーミング テーブルで DML ステートメントを使用する方法の詳細については、「ストリーミング テーブルのデータの追加、変更、または削除」を参照してください。

次の例では、開始シーケンスが 5 のアクティブ なレコードを挿入します。

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

AUTO CDC ターゲット テーブルから変更データ フィードを読み取る

Databricks Runtime 15.2 以降では、他の Delta テーブルから変更データ フィードを読み取るのと同じ方法で、 AUTO CDC クエリまたは AUTO CDC FROM SNAPSHOT クエリのターゲットであるストリーミング テーブルから変更データ フィードを読み取ることができます。 ターゲット ストリーミング テーブルから変更データ フィードを読み取る場合は、次のものが必要です。

  • ターゲット ストリーミング テーブルは、Unity カタログに発行する必要があります。 「Lakeflow 宣言パイプラインで Unity カタログを使用する」を参照してください。
  • ターゲット ストリーミング テーブルから変更データ フィードを読み取る場合は、Databricks Runtime 15.2 以降を使用する必要があります。 別のパイプラインで変更データ フィードを読み取るために、Databricks Runtime 15.2 以降を使用するようにパイプラインを構成する必要があります。

他の Delta テーブルから変更データ フィードを読み取るのと同じ方法で、Lakeflow 宣言パイプラインで作成されたターゲット ストリーミング テーブルから変更データ フィードを読み取ります。 PythonとSQLの例を含め、Delta変更データフィード機能の詳細については、Azure DatabricksでのDelta Lake変更データフィードの使用をご覧ください。

変更データ フィード レコードには、変更イベントの種類を識別する メタデータ が含まれています。 テーブル内のレコードが更新されると、関連付けられている変更レコードのメタデータには、通常、_change_typeイベントとupdate_preimageイベントに設定されたupdate_postimage値が含まれます。

ただし、 _change_type 値は、主キー値の変更を含むターゲット ストリーミング テーブルに対して更新が行われる場合は異なります。 変更に主キーの更新が含まれている場合、 _change_type メタデータ フィールドはイベントの insertdelete に設定されます。 主キーに対する変更は、 UPDATE または MERGE ステートメントを使用していずれかのキー フィールドに対して手動で更新を行った場合、または SCD タイプ 2 テーブルの場合、 __start_at フィールドが以前の開始シーケンス値を反映するように変更された場合に発生する可能性があります。

AUTO CDC クエリは、SCD タイプ 1 と SCD タイプ 2 の処理で異なる主キー値を決定します。

  • SCD タイプ 1 の処理と Lakeflow 宣言パイプライン Python インターフェイスの場合、主キーは keys 関数の create_auto_cdc_flow() パラメーターの値です。 Lakeflow 宣言パイプライン SQL インターフェイスの主キーは、KEYS ステートメントの AUTO CDC ... INTO 句によって定義された列です。
  • SCD 型 2 の場合、主キーは、 keys パラメーターまたは KEYS 句に coalesce(__START_AT, __END_AT) 操作からの戻り値を加えたものになります。ここで、 __START_AT__END_AT はターゲット ストリーミング テーブルの対応する列です。

Lakeflow 宣言パイプライン CDC クエリによって処理されたレコードに関するデータを取得する

次のメトリックは、AUTO CDC クエリではなく、AUTO CDC FROM SNAPSHOT クエリによってのみキャプチャされます。

次のメトリックは、 AUTO CDC クエリによってキャプチャされます。

  • num_upserted_rows: 更新中にデータセットにアップサートされた出力行の数。
  • num_deleted_rows: 更新中にデータセットから削除された既存の出力行の数。

メトリック (非 CDC フローの出力) は、 クエリではキャプチャされません。

Lakeflow デクラレーティブ パイプラインのCDC処理に使用されるデータオブジェクトは何ですか?

  • これらのデータ構造は、AUTO CDC処理ではなく、AUTO CDC FROM SNAPSHOT処理にのみ適用されます。
  • これらのデータ構造は、ターゲット テーブルが Hive メタストアに発行されている場合にのみ適用されます。 パイプラインが Unity カタログに発行されると、内部バッキング テーブルにユーザーがアクセスできなくなります。

Hive メタストアでターゲット テーブルを宣言すると、次の 2 つのデータ構造が作成されます。

  • ターゲット テーブルに割り当てられた名前を使用するビュー。
  • CDC 処理を管理するために Lakeflow の宣言型パイプラインによって使用される内部のバックテーブル。 このテーブルの名前は、ターゲット テーブル名に対するプリペンド __apply_changes_storage_ によって指定されます。

たとえば、 dlt_cdc_targetという名前のターゲット テーブルを宣言すると、 dlt_cdc_target という名前のビューと、メタストアに __apply_changes_storage_dlt_cdc_target という名前のテーブルが表示されます。 ビューを作成すると、Lakeflow 宣言型パイプラインは、順序が不正なデータを処理するために必要な追加情報(たとえば、墓標やバージョン)を除外できます。 処理されたデータを表示するには、ターゲット ビューに対してクエリを実行します。 __apply_changes_storage_ テーブルのスキーマは、将来の機能や機能強化をサポートするために変更される可能性があるため、運用環境で使用するためにテーブルのクエリを実行しないでください。 テーブルにデータを手動で追加した場合、バージョン列がないため、レコードは他の変更よりも前にあると見なされます。

その他のリソース