Important
この機能は パブリック プレビュー段階です。
create_auto_cdc_from_snapshot_flow関数は、Lakeflow Spark 宣言型パイプライン変更データ キャプチャ (CDC) 機能を使用して、データベース スナップショットからのソース データを処理するフローを作成します。
AUTO CDC FROM SNAPSHOT API を使用して CDC を実装する方法を参照してください。
注
この関数は、前の関数の apply_changes_from_snapshot()を置き換えます。 2 つの関数のシグネチャは同じです。 Databricks では、新しい名前を使用するように更新することをお勧めします。
構文
from pyspark import pipelines as dp
dp.create_auto_cdc_from_snapshot_flow(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
注
AUTO CDC FROM SNAPSHOT処理の場合、既定の動作は、同じキーを持つ一致するレコードがターゲットに存在しない場合に新しい行を挿入することです。 一致するレコードが存在する場合は、行の値のいずれかが変更された場合にのみ更新されます。 ターゲットに存在するが、ソースにはもう存在しないキーを持つ行は削除されます。
スナップショットを使用した CDC 処理の詳細については、「 AUTO CDC API: パイプラインを使用して変更データ キャプチャを簡略化する」を参照してください。
create_auto_cdc_from_snapshot_flow()関数の使用例については、定期的なスナップショット インジェストとスナップショット インジェストの履歴の例を参照してください。
パラメーター
| パラメーター | タイプ | Description |
|---|---|---|
target |
str |
必須。 更新するテーブルの名前。
関数を実行する前にcreate_auto_cdc_from_snapshot_flow() 関数を使用してターゲット テーブルを作成できます。 |
source |
str または lambda function |
必須。 定期的にスナップショットを作成するテーブルまたはビューの名前、または処理するスナップショット DataFrame とスナップショット バージョンを返す Python ラムダ関数。
source引数の実装を参照してください。 |
keys |
list |
必須。 ソース データ内の行を一意に識別する列または列の組み合わせ。 これは、ターゲット テーブル内の特定のレコードに適用される CDC イベントを識別するために使用されます。 次のいずれかを指定できます。
|
stored_as_scd_type |
str または int |
レコードを SCD タイプ 1 または SCD タイプ 2 として格納するかどうか。 SCD タイプ 1 の場合は 1 、SCD タイプ 2 の場合は 2 に設定されます。 既定値は SCD タイプ 1 です。 |
track_history_column_list または track_history_except_column_list |
list |
ターゲット テーブル内の履歴の追跡対象となる出力列のサブセット。
track_history_column_listを使用して、追跡する列の完全な一覧を指定します。
track_history_except_column_listを使用して、追跡から除外する列を指定します。 値は、文字列の一覧として宣言することも、Spark SQL col() 関数として宣言することもできます。
col()関数の引数に修飾子を含めることはできません。 たとえば、 col(userId)を使用することはできますが、 col(source.userId)は使用できません。 既定では、 track_history_column_list または track_history_except_column_list 引数が関数に渡されない場合に、ターゲット テーブル内のすべての列が含まれます。 |
source引数を実装する
create_auto_cdc_from_snapshot_flow()関数には、source引数が含まれています。 履歴スナップショットを処理する場合、 source 引数は、処理するスナップショット データとスナップショット バージョンを含む Python DataFrame という 2 つの値を create_auto_cdc_from_snapshot_flow() 関数に返す Python ラムダ関数であることが期待されます。
ラムダ関数のシグネチャを次に示します。
lambda Any => Optional[(DataFrame, Any)]
- ラムダ関数の引数は、最後に処理されたスナップショット バージョンです。
- ラムダ関数の戻り値は、
Noneまたは 2 つの値のタプルです。タプルの最初の値は、処理されるスナップショットを含む DataFrame です。 タプルの 2 番目の値は、スナップショットの論理順序を表すスナップショット バージョンです。
ラムダ関数を実装して呼び出す例:
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
create_auto_cdc_from_snapshot_flow(
# ...
source = next_snapshot_and_version,
# ...
)
Lakeflow Spark 宣言型パイプライン ランタイムは、 create_auto_cdc_from_snapshot_flow() 関数を含むパイプラインがトリガーされるたびに、次の手順を実行します。
-
next_snapshot_and_version関数を実行して、次のスナップショット DataFrame と対応するスナップショット バージョンを読み込みます。 - DataFrame が返されない場合、実行は終了し、パイプラインの更新は完了としてマークされます。
- 新しいスナップショットの変更を検出し、ターゲット テーブルに増分的に適用します。
- 手順 1 に戻り、次のスナップショットとそのバージョンを読み込みます。