Lakeflow 宣言パイプラインは、 AUTO CDC
API と AUTO CDC FROM SNAPSHOT
API を使用して変更データ キャプチャ (CDC) を簡略化します。
注
AUTO CDC
API は、以前は APPLY CHANGES
と呼ばれ、同じ構文を持っていました。
使用するインターフェイスは、変更データのソースによって異なります。
- 変更データ フィード (CDF) からの変更を処理するには、
AUTO CDC
を使用します。 -
AUTO CDC FROM SNAPSHOT
(パブリック プレビュー、および Python でのみ使用可能) を使用して、データベース スナップショットの変更を処理します。
以前は、 MERGE INTO
ステートメントは、Azure Databricks で CDC レコードを処理するために一般的に使用されていました。 ただし、 MERGE INTO
は、レコードの順序が正しくないために正しくない結果が生成される場合や、レコードの順序を再作成するための複雑なロジックが必要になる場合があります。
AUTO CDC
API は、Lakeflow 宣言パイプラインの SQL および Python インターフェイスでサポートされています。
AUTO CDC FROM SNAPSHOT
API は、Lakeflow 宣言パイプライン Python インターフェイスでサポートされています。
AUTO CDC
とAUTO CDC FROM SNAPSHOT
の両方で、SCD タイプ 1 とタイプ 2 を使用したテーブルの更新がサポートされます。
- SCD タイプ 1 を使用して、レコードを直接更新します。 更新されたレコードの履歴は保持されません。
- SCD タイプ 2 を使用して、すべての更新または指定された列セットの更新時にレコードの履歴を保持します。
構文などの参照については、Lakeflow 宣言型パイプライン SQL の AUTO CDC、Lakeflow 宣言型パイプライン Python の AUTO CDC、およびLakeflow 宣言型パイプライン Python の AUTO CDC FROM SNAPSHOTを参照してください。
注
この記事では、ソース データの変更に基づいて、Lakeflow 宣言パイプラインのテーブルを更新する方法について説明します。 Delta テーブルの行レベルの変更情報を記録およびクエリする方法については、「 Azure Databricks での Delta Lake 変更データ フィードの使用」を参照してください。
必要条件
CDC API を使用するには、パイプラインを serverlessLakeflow 宣言型パイプライン、Lakeflow 宣言型パイプライン、または Advanced
エディション のいずれかを使用するように構成する必要があります。
CDC は AUTO CDC API でどのように実装されますか?
Lakeflow 宣言パイプラインの AUTO CDC API は、シーケンス外のレコードを自動的に処理することで、CDC レコードの正しい処理を保証し、シーケンス外レコードを処理するための複雑なロジックを開発する必要がなくなります。 ソース データ内でレコードをシーケンスする列を指定する必要があります。この列は、Lakeflow 宣言パイプラインがソース データの適切な順序を単調に増加させる表現として解釈します。 Lakeflow 宣言型パイプラインは、順不同で到着するデータを自動的に処理します。 SCD タイプ 2 の変更の場合、Lakeflow 宣言パイプラインは、適切なシーケンス値をターゲット テーブルの __START_AT
列と __END_AT
列に伝達します。 各シーケンス値にはキーごとに 1 つの個別の更新が必要であり、NULL シーケンス値はサポートされていません。
AUTO CDC
を使用して CDC 処理を実行するには、まずストリーミング テーブルを作成し、次に SQL の AUTO CDC ... INTO
ステートメントまたは Python の create_auto_cdc_flow()
関数を使用して、変更フィードのソース、キー、シーケンスを指定します。 ターゲット ストリーミング テーブルを作成するには、SQL の CREATE OR REFRESH STREAMING TABLE
ステートメントまたは Python の create_streaming_table()
関数を使用します。
SCD タイプ 1 およびタイプ 2 の処理例を参照してください。
構文の詳細については、Lakeflow 宣言型パイプライン の SQL リファレンス または Python リファレンスを参照してください。
API を使用して CDC を実装する方法
重要
AUTO CDC FROM SNAPSHOT
API はパブリック プレビュー段階です。
AUTO CDC FROM SNAPSHOT
は、一連のインオーダー スナップショットを比較してソース データの変更を効率的に判断し、スナップショット内のレコードの CDC 処理に必要な処理を実行する宣言型 API です。
AUTO CDC FROM SNAPSHOT
は、Lakeflow 宣言型パイプライン Python インターフェイスでのみサポートされています。
AUTO CDC FROM SNAPSHOT
では、複数のソースの種類からのスナップショットの取り込みがサポートされています。
- 定期的なスナップショット インジェストを使用して、既存のテーブルまたはビューからスナップショットを取り込みます。
AUTO CDC FROM SNAPSHOT
には、既存のデータベース オブジェクトからのスナップショットの定期的な取り込みをサポートするシンプルで合理化されたインターフェイスがあります。 パイプラインの更新ごとに新しいスナップショットが取り込まれており、取り込み時間がスナップショット バージョンとして使用されます。 パイプラインが連続モードで実行されると、処理を含むフローのAUTO CDC FROM SNAPSHOT
設定によって決定された期間に、パイプラインの更新ごとに複数のスナップショットが取り込まれます。 - 履歴スナップショット インジェストを使用して、Oracle または MySQL データベースまたはデータ ウェアハウスから生成されたスナップショットなど、データベース スナップショットを含むファイルを処理します。
AUTO CDC FROM SNAPSHOT
を使用して任意のソースの種類から CDC 処理を実行するには、まずストリーミング テーブルを作成してから、Python で create_auto_cdc_from_snapshot_flow()
関数を使用して、処理を実装するために必要なスナップショット、キー、およびその他の引数を指定します。
定期的なスナップショット インジェストとスナップショット インジェストの履歴の例を参照してください。
API に渡されるスナップショットは、バージョン別の昇順である必要があります。 Lakeflow 宣言型パイプラインで順序が不正なスナップショットが検出されると、エラーが発生します。
構文の詳細については、Lakeflow 宣言型パイプライン の Python リファレンスを参照してください。
シーケンス処理に複数の列を使用する
複数の列を順番に並べ替えること (例: タイムスタンプと ID を使用した順序付け) ができます。STRUCT を使用するとそれらを組み合わせられます。最初に STRUCT の最初のフィールドで順序付けを開始し、同じ順番が表れた場合は 2 番目のフィールドを考慮します。
SQL の例:
SEQUENCE BY STRUCT(timestamp_col, id_col)
Python の例:
sequence_by = struct("timestamp_col", "id_col")
制限事項
シーケンス処理に使用する列は、並べ替え可能なデータ型である必要があります。
例: CDF ソース・データを使用した SCD タイプ 1 および SCD タイプ 2 の処理
次のセクションでは、変更データ フィードからのソース イベントに基づいてターゲット テーブルを更新する Lakeflow 宣言型パイプライン SCD タイプ 1 およびタイプ 2 のクエリの例を示します。
- 新しいユーザー レコードを作成します。
- ユーザー レコードを削除します。
- ユーザー レコードを更新します。 SCD タイプ 1 の例では、最後の
UPDATE
操作は遅れて到着し、ターゲット テーブルから削除され、順序が不順のイベントの処理を示しています。
次の例では、Lakeflow 宣言パイプラインの構成と更新に関する知識を前提としています。 「チュートリアル: Lakeflow 宣言型パイプラインを使用して変更データ キャプチャを使用して ETL パイプラインを構築する」を参照してください。
これらの例を実行するには、まずサンプル データセットを作成する必要があります。 「テスト データの生成」を参照してください。
これらの例の入力レコードを次に示します。
ユーザーID | 名前 | 都市 | 運営 | シーケンス番号 |
---|---|---|---|---|
124 | ラウル | オアハカ州 | INSERT | 1 |
123 | イザベル | モンテレー | INSERT | 1 |
125 | メルセデス | ティファナ | INSERT | 2 |
126 | リリー | カンクン | INSERT | 2 |
123 | 無効 | 無効 | 削除 | 6 |
125 | メルセデス | グアダラハラ | UPDATE | 6 |
125 | メルセデス | メヒカリ | UPDATE | 5 |
123 | イザベル | チワワ | UPDATE | 5 |
サンプル データの最後の行のコメントを解除すると、レコードを切り捨てる場所を指定する次のレコードが挿入されます。
ユーザーID | 名前 | 都市 | 運営 | シーケンス番号 |
---|---|---|---|---|
無効 | 無効 | 無効 | 切り捨てる | 3 |
注
次の例には、 DELETE
操作と TRUNCATE
操作の両方を指定するオプションが含まれていますが、それぞれは省略可能です。
SCD タイプ 1 の更新を処理する
次の例では、SCD タイプ 1 の更新の処理を示します。
Python(プログラミング言語)
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flowname AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
SCD タイプ 1 の例を実行した後、ターゲット テーブルには次のレコードが含まれます。
ユーザーID | 名前 | 都市 |
---|---|---|
124 | ラウル | オアハカ州 |
125 | メルセデス | グアダラハラ |
126 | リリー | カンクン |
追加のTRUNCATE
レコードで SCD タイプ 1 の例を実行すると、124
での126
操作により、レコードTRUNCATE
およびsequenceNum=3
が切り捨てられ、ターゲット テーブルに次のレコードが含まれます。
ユーザーID | 名前 | 都市 |
---|---|---|
125 | メルセデス | グアダラハラ |
SCD タイプ 2 の更新を処理する
次の例は、SCD タイプ 2 の更新の処理を示しています。
Python(プログラミング言語)
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW target_flow
AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
SCD タイプ 2 の例を実行した後、ターゲット テーブルには次のレコードが含まれます。
ユーザーID | 名前 | 都市 | __START_AT | __END_AT |
---|---|---|---|---|
123 | イザベル | モンテレー | 1 | 5 |
123 | イザベル | チワワ | 5 | 6 |
124 | ラウル | オアハカ州 | 1 | 無効 |
125 | メルセデス | ティファナ | 2 | 5 |
125 | メルセデス | メヒカリ | 5 | 6 |
125 | メルセデス | グアダラハラ | 6 | 無効 |
126 | リリー | カンクン | 2 | 無効 |
SCD タイプ 2 クエリでは、ターゲット テーブルの履歴に対して追跡する出力列のサブセットを指定することもできます。 他の列への変更は、新しい履歴レコードを生成するのではなく、その場で更新されます。 次の例では、 city
列を追跡から除外する方法を示します。
次の例では、SCD タイプ 2 でトラック履歴を使用する方法を示します。
Python(プログラミング言語)
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW target_flow
AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
追加の TRUNCATE
レコードなしでこの例を実行した後、ターゲット テーブルには次のレコードが含まれます。
ユーザーID | 名前 | 都市 | __START_AT | __END_AT |
---|---|---|---|---|
123 | イザベル | チワワ | 1 | 6 |
124 | ラウル | オアハカ州 | 1 | 無効 |
125 | メルセデス | グアダラハラ | 2 | 無効 |
126 | リリー | カンクン | 2 | 無効 |
テスト データを生成する
次のコードは、このチュートリアルに存在するサンプル クエリで使用するサンプル データセットを生成するために提供されています。 新しいスキーマを作成して新しいテーブルを作成するための適切な資格情報があると仮定すると、ノートブックまたは Databricks SQL でこれらのステートメントを実行できます。 次のコードは、Lakeflow 宣言パイプラインの一部として実行することを意図 していません 。
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
例: 定期的なスナップショット処理
次の例では、 mycatalog.myschema.mytable
に格納されているテーブルのスナップショットを取り込む SCD タイプ 2 の処理を示します。 処理の結果は、 target
という名前のテーブルに書き込まれます。
タイムスタンプ 2024-01-01 00:00:00 の mycatalog.myschema.mytable
レコード
鍵 | 価値 |
---|---|
1 | a1 |
2 | a2 |
mycatalog.myschema.mytable
2024年1月1日12時00分のタイムスタンプのレコード
鍵 | 価値 |
---|---|
2 | b2 |
3 | a3 |
import dlt
@dlt.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")
dlt.create_streaming_table("target")
dlt.create_auto_cdc_from_snapshot_flow(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)
スナップショットの処理後、ターゲット テーブルには次のレコードが含まれます。
鍵 | 価値 | __START_AT | __END_AT |
---|---|---|---|
1 | a1 | 2024年1月1日 00時00分00秒 | 2024-01-01 12:00:00 |
2 | a2 | 2024年1月1日 00時00分00秒 | 2024-01-01 12:00:00 |
2 | b2 | 2024-01-01 12:00:00 | 無効 |
3 | a3 | 2024-01-01 12:00:00 | 無効 |
例: スナップショット処理の履歴
次の例は、クラウド ストレージ システムに格納されている 2 つのスナップショットのソース イベントに基づいてターゲット テーブルを更新する SCD タイプ 2 の処理を示しています。
timestamp
に格納されているスナップショット/<PATH>/filename1.csv
鍵 | トラッキングカラム | 追跡しない列 |
---|---|---|
1 | a1 | b1 |
2 | a2 | b2 |
4 | a4 | b4 |
timestamp + 5
に格納されているスナップショット/<PATH>/filename2.csv
鍵 | トラッキングカラム | 追跡しない列 |
---|---|---|
2 | a2_new | b2 |
3 | a3 | b3 |
4 | a4 | b4_new |
次のコード例は、これらのスナップショットを使用した SCD タイプ 2 の更新の処理を示しています。
import dlt
def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise
# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None
dlt.create_streaming_live_table("target")
dlt.create_auto_cdc_from_snapshot_flow(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)
スナップショットの処理後、ターゲット テーブルには次のレコードが含まれます。
鍵 | トラッキングカラム | 追跡しない列 | __START_AT | __END_AT |
---|---|---|---|---|
1 | a1 | b1 | 1 | 2 |
2 | a2 | b2 | 1 | 2 |
2 | a2_new | b2 | 2 | 無効 |
3 | a3 | b3 | 2 | 無効 |
4 | a4 | b4_new | 1 | 無効 |
ターゲット ストリーミング テーブル内のデータを追加、変更、または削除する
パイプラインでテーブルを Unity Catalog に発行する場合は、挿入、更新、削除、マージステートメントなどの データ操作言語 (DML) ステートメントを使用して、 AUTO CDC ... INTO
ステートメントによって作成されたターゲット ストリーミング テーブルを変更できます。
注
- ストリーミング テーブルのテーブル スキーマを変更する DML ステートメントはサポートされていません。 DML ステートメントがテーブル スキーマの進化を試みないことを確認します。
- ストリーミング テーブルを更新する DML ステートメントは、Databricks Runtime 13.3 LTS 以降を使用する共有 Unity Catalog クラスターまたは SQL ウェアハウスでのみ実行できます。
- ストリーミングには追加専用のデータソースが必要なため、処理で (DML ステートメントなどによる) 変更を伴うソース ストリーミング テーブルからのストリーミングが必要な場合は、ソース ストリーミング テーブルを読み取るときに skipChangeCommits フラグを設定します。
skipChangeCommits
が設定されていれば、ソース テーブルのレコードを削除または変更するトランザクションは無視されます。 処理にストリーミング テーブルが必要ない場合は、具体化されたビュー (追加専用の制限がない) をターゲット テーブルとして使用できます。
Lakeflow 宣言パイプラインは、指定した SEQUENCE BY
列を使用し、ターゲット テーブルの __START_AT
列と __END_AT
列に適切なシーケンス値を伝達するため (SCD 型 2 の場合)、DML ステートメントでこれらの列の有効な値を使用して、レコードの適切な順序を維持する必要があります。
「AUTO CDC API を使用して CDC を実装する方法」を参照してください。
ストリーミング テーブルで DML ステートメントを使用する方法の詳細については、「ストリーミング テーブルのデータの追加、変更、または削除」を参照してください。
次の例では、開始シーケンスが 5 のアクティブ なレコードを挿入します。
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
AUTO CDC ターゲット テーブルから変更データ フィードを読み取る
Databricks Runtime 15.2 以降では、他の Delta テーブルから変更データ フィードを読み取るのと同じ方法で、 AUTO CDC
クエリまたは AUTO CDC FROM SNAPSHOT
クエリのターゲットであるストリーミング テーブルから変更データ フィードを読み取ることができます。 ターゲット ストリーミング テーブルから変更データ フィードを読み取る場合は、次のものが必要です。
- ターゲット ストリーミング テーブルは、Unity カタログに発行する必要があります。 「Lakeflow 宣言パイプラインで Unity カタログを使用する」を参照してください。
- ターゲット ストリーミング テーブルから変更データ フィードを読み取る場合は、Databricks Runtime 15.2 以降を使用する必要があります。 別のパイプラインで変更データ フィードを読み取るために、Databricks Runtime 15.2 以降を使用するようにパイプラインを構成する必要があります。
他の Delta テーブルから変更データ フィードを読み取るのと同じ方法で、Lakeflow 宣言パイプラインで作成されたターゲット ストリーミング テーブルから変更データ フィードを読み取ります。 PythonとSQLの例を含め、Delta変更データフィード機能の詳細については、Azure DatabricksでのDelta Lake変更データフィードの使用をご覧ください。
注
変更データ フィード レコードには、変更イベントの種類を識別する メタデータ が含まれています。 テーブル内のレコードが更新されると、関連付けられている変更レコードのメタデータには、通常、_change_type
イベントとupdate_preimage
イベントに設定されたupdate_postimage
値が含まれます。
ただし、 _change_type
値は、主キー値の変更を含むターゲット ストリーミング テーブルに対して更新が行われる場合は異なります。 変更に主キーの更新が含まれている場合、 _change_type
メタデータ フィールドはイベントの insert
と delete
に設定されます。 主キーに対する変更は、 UPDATE
または MERGE
ステートメントを使用していずれかのキー フィールドに対して手動で更新を行った場合、または SCD タイプ 2 テーブルの場合、 __start_at
フィールドが以前の開始シーケンス値を反映するように変更された場合に発生する可能性があります。
AUTO CDC
クエリは、SCD タイプ 1 と SCD タイプ 2 の処理で異なる主キー値を決定します。
- SCD タイプ 1 の処理と Lakeflow 宣言パイプライン Python インターフェイスの場合、主キーは
keys
関数のcreate_auto_cdc_flow()
パラメーターの値です。 Lakeflow 宣言パイプライン SQL インターフェイスの主キーは、KEYS
ステートメントのAUTO CDC ... INTO
句によって定義された列です。 - SCD 型 2 の場合、主キーは、
keys
パラメーターまたはKEYS
句にcoalesce(__START_AT, __END_AT)
操作からの戻り値を加えたものになります。ここで、__START_AT
と__END_AT
はターゲット ストリーミング テーブルの対応する列です。
Lakeflow 宣言パイプライン CDC クエリによって処理されたレコードに関するデータを取得する
注
次のメトリックは、AUTO CDC
クエリではなく、AUTO CDC FROM SNAPSHOT
クエリによってのみキャプチャされます。
次のメトリックは、 AUTO CDC
クエリによってキャプチャされます。
-
num_upserted_rows
: 更新中にデータセットにアップサートされた出力行の数。 -
num_deleted_rows
: 更新中にデータセットから削除された既存の出力行の数。
メトリック (非 CDC フローの出力) は、 クエリではキャプチャされません。
Lakeflow デクラレーティブ パイプラインのCDC処理に使用されるデータオブジェクトは何ですか?
注
- これらのデータ構造は、
AUTO CDC
処理ではなく、AUTO CDC FROM SNAPSHOT
処理にのみ適用されます。 - これらのデータ構造は、ターゲット テーブルが Hive メタストアに発行されている場合にのみ適用されます。 パイプラインが Unity カタログに発行されると、内部バッキング テーブルにユーザーがアクセスできなくなります。
Hive メタストアでターゲット テーブルを宣言すると、次の 2 つのデータ構造が作成されます。
- ターゲット テーブルに割り当てられた名前を使用するビュー。
- CDC 処理を管理するために Lakeflow の宣言型パイプラインによって使用される内部のバックテーブル。 このテーブルの名前は、ターゲット テーブル名に対するプリペンド
__apply_changes_storage_
によって指定されます。
たとえば、 dlt_cdc_target
という名前のターゲット テーブルを宣言すると、 dlt_cdc_target
という名前のビューと、メタストアに __apply_changes_storage_dlt_cdc_target
という名前のテーブルが表示されます。 ビューを作成すると、Lakeflow 宣言型パイプラインは、順序が不正なデータを処理するために必要な追加情報(たとえば、墓標やバージョン)を除外できます。 処理されたデータを表示するには、ターゲット ビューに対してクエリを実行します。
__apply_changes_storage_
テーブルのスキーマは、将来の機能や機能強化をサポートするために変更される可能性があるため、運用環境で使用するためにテーブルのクエリを実行しないでください。 テーブルにデータを手動で追加した場合、バージョン列がないため、レコードは他の変更よりも前にあると見なされます。