AUTO CDC ... INTO ステートメントを使用して、Lakeflow Spark 宣言パイプライン変更データ キャプチャ (CDC) 機能を使用するフローを作成します。 このステートメントは、CDC ソースから変更を読み取り、ストリーミング ターゲットに適用します。
- CDC の詳細については、「 変更データ キャプチャ (CDC) とは」を参照してください。
-
AUTO CDCの使用の詳細については、「AUTO CDC API: パイプラインを使用して変更データ キャプチャを簡略化する」を参照してください。 -
CREATE FLOWの詳細については、「CREATE FLOW (パイプライン)」を参照してください。
構文
CREATE OR REFRESH STREAMING TABLE table_name;
CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
ターゲットのデータ品質制約は、他のパイプライン クエリと同じ CONSTRAINT 句を使用して定義します。 パイプラインの期待を使用してデータ品質を管理する方法については、を参照してください。
INSERTイベントとUPDATEイベントの既定の動作は、ソースから CDC イベントをアップサートすることです。指定したキーに一致するターゲット テーブル内のすべての行を更新するか、一致するレコードがターゲット テーブルに存在しない場合は新しい行を挿入します。
DELETEイベントの処理は、APPLY AS DELETE WHEN条件で指定できます。
Important
変更を適用するには、ターゲット ストリーミング テーブルを宣言する必要があります。 必要に応じて、ターゲット テーブルのスキーマを指定できます。 SCD タイプ 2 テーブルの場合、ターゲット テーブルのスキーマを指定する場合は、__START_AT フィールドと同じデータ型の__END_AT列とsequence_by列も含める必要があります。
「AUTO CDC API: パイプラインを使用して変更データ キャプチャを簡略化する」を参照してください。
パラメーター
flow_name作成するフローの名前。
sourceデータのソース。 ソースは ストリーミング ソースである必要があります。 STREAM キーワードを使用して、ストリーミング セマンティクスを使用してソースから読み取ります。 読み取りで既存のレコードの変更または削除が発生した場合は、エラーがスローされます。 静的ソースまたは追加専用ソースから読み取るのが最も安全です。 変更コミットがあるデータを取り込むには、Python と
SkipChangeCommitsオプションを使用してエラーを処理できます。ストリーミング データの詳細については、「パイプラインを使用してデータを変換する」を参照してください。
KEYSソース データ内の行を一意に識別する列または列の組み合わせ。 これらの列の値は、ターゲット テーブル内の特定のレコードに適用される CDC イベントを識別するために使用されます。
列の組み合わせを定義するには、列のコンマ区切りのリストを使用します。
この条項は必須です。
IGNORE NULL UPDATESターゲット列のサブセットを含む更新プログラムの取り込みを許可します。 CDC イベントが既存の行と一致し、IGNORE NULL UPDATES が指定されている場合、
null値を持つ列はターゲット内の既存の値を保持します。 これは、null値を持つ入れ子になった列にも適用されます。この句は省略可能です。
既定値は、既存の列を
null値で上書きすることです。APPLY AS DELETE WHENCDC イベントをアップサートではなく
DELETEとして扱うタイミングを指定します。SCD タイプ 2 のソースでは、順序が正しく指定されていないデータを処理するために、削除された行は基になる Delta テーブルの廃棄石として一時的に保持され、これらの廃棄石を除外するビューがメタストアに作成されます。 保持間隔は、
pipelines.cdc.tombstoneGCThresholdInSecondstable プロパティを使用して構成できます。この句は省略可能です。
APPLY AS TRUNCATE WHENCDC イベントを完全なテーブル
TRUNCATEとして扱うタイミングを指定します。 この句はターゲット テーブルの完全な切り捨てをトリガーするため、この機能を必要とする特定のユース ケースにのみ使用する必要があります。APPLY AS TRUNCATE WHEN句は、SCD 型 1 でのみサポートされます。 SCD タイプ 2 では、切り捨て操作はサポートされていません。この句は省略可能です。
SEQUENCE BYソース データ内の CDC イベントの論理順序を指定する列名。 パイプライン処理では、このシーケンス処理を使用して、順不同に到着した変更イベントを処理します。
シーケンス処理に複数の列が必要な場合は、
STRUCT式を使用します。最初に最初の構造体フィールドで順序付けを行い、同じ値の場合は次に2番目のフィールドで並べ替えを行います。指定された列は、並べ替え可能なデータ型である必要があります。
この句は必須です。
COLUMNSターゲット テーブルに含める列のサブセットを指定します。 次のいずれかを実行できます。
- 含める列の完全な一覧を指定します:
COLUMNS (userId, name, city)。 - 除外する列の一覧を指定します。
COLUMNS * EXCEPT (operation, sequenceNum)
この句は省略可能です。
既定では、
COLUMNS句が指定されていない場合は、ターゲット テーブル内のすべての列が含まれます。- 含める列の完全な一覧を指定します:
STORED ASレコードを SCD タイプ 1 または SCD タイプ 2 として格納するかどうか。
この句は省略可能です。
既定値は SCD タイプ 1 です。
TRACK HISTORY ON指定した列に変更がある場合に履歴レコードを生成する出力列のサブセットを指定します。 次のいずれかを実行できます。
- 追跡する列の完全な一覧を指定します:
COLUMNS (userId, name, city)。 - 追跡から除外する列の一覧を指定します。
COLUMNS * EXCEPT (operation, sequenceNum)
この句は省略可能です。 既定では、
TRACK HISTORY ON *と同等の変更がある場合に、すべての出力列の履歴を追跡します。- 追跡する列の完全な一覧を指定します:
例示
-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flow
AS AUTO CDC INTO
target
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);