create_auto_cdc_flow()関数は、Lakeflow Spark 宣言型パイプラインの変更データ キャプチャ (CDC) 機能を使用して、変更データ フィード (CDF) からのソース データを処理するフローを作成します。
注
この関数は、前の関数の apply_changes()を置き換えます。 2 つの関数のシグネチャは同じです。 Databricks では、新しい名前を使用するように更新することをお勧めします。
Important
変更を適用するには、ターゲット ストリーミング テーブルを宣言する必要があります。 必要に応じて、ターゲット テーブルのスキーマを指定できます。
create_auto_cdc_flow()ターゲット テーブルのスキーマを指定する場合は、__START_AT フィールドと同じデータ型の__END_AT列とsequence_by列を含める必要があります。
必要なターゲット テーブルを作成するには、パイプライン Python インターフェイスで create_streaming_table() 関数を使用します。
構文
from pyspark import pipelines as dp
dp.create_auto_cdc_flow(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = <bool>,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None,
name = None,
once = <bool>
)
create_auto_cdc_flow処理の場合、INSERTイベントとUPDATE イベントの既定の動作は、ソースから CDC イベントをアップサートすることです。指定したキーに一致するターゲット テーブル内の行を更新するか、一致するレコードがターゲット テーブルに存在しない場合は新しい行を挿入します。
DELETE イベントの処理は、apply_as_deletes パラメーターを使用して指定できます。
変更フィードを使用した CDC 処理の詳細については、「 AUTO CDC API: パイプラインを使用して変更データ キャプチャを簡略化する」を参照してください。
create_auto_cdc_flow()関数の使用例については、「例: CDF ソース データを使用した SCD タイプ 1 および SCD タイプ 2 の処理」を参照してください。
パラメーター
| パラメーター |
タイプ |
Description |
target |
str |
必須。 更新するテーブルの名前。
関数を実行する前にcreate_auto_cdc_flow() 関数を使用してターゲット テーブルを作成できます。 |
source |
str |
必須。 CDC レコードを含むデータ ソース。 |
keys |
list |
必須。 ソース データ内の行を一意に識別する列または列の組み合わせ。 これは、ターゲット テーブル内の特定のレコードに適用される CDC イベントを識別するために使用されます。 次のいずれかを指定できます。
- 文字列の一覧:
["userId", "orderId"] - Spark SQL
col() 関数の一覧: [col("userId"), col("orderId")]。
col()関数の引数に修飾子を含めることはできません。 たとえば、 col(userId)を使用することはできますが、 col(source.userId)は使用できません。
|
sequence_by |
str、col()、または struct() |
必須。 ソース データ内の CDC イベントの論理順序を指定する列名。 Lakeflow Spark 宣言型パイプラインでは、このシーケンス処理を使用して、順不同に到着した変更イベントを処理します。 指定された列は、並べ替え可能なデータ型である必要があります。 次のいずれかを指定できます。
- 文字列:
"sequenceNum" - Spark SQL
col() 関数: col("sequenceNum")。
col()関数の引数に修飾子を含めることはできません。 たとえば、 col(userId)を使用することはできますが、 col(source.userId)は使用できません。 - 複数の列を組み合わせて同点を解消する
struct() : struct("timestamp_col", "id_col")、最初に構造体フィールドで並べ替え、タイの場合は次に2番目のフィールドで並べ替えるようにします。
|
ignore_null_updates |
bool |
ターゲット列のサブセットを含む更新プログラムの取り込みを許可します。 CDC イベントが既存の行と一致し、 ignore_null_updates が Trueされると、 null を持つ列はターゲット内の既存の値を保持します。 これは、値が null の入れ子になった列にも適用されます。
ignore_null_updatesがFalseされると、既存の値はnull値で上書きされます。 既定値は Falseです。 |
apply_as_deletes |
str または expr() |
CDC イベントをアップサートではなく DELETE として扱うタイミングを指定します。 次のいずれかを指定できます。
- 文字列:
"Operation = 'DELETE'" - Spark SQL
expr() 関数: expr("Operation = 'DELETE'")
順序が正しく設定されていないデータを処理するために、削除された行は一時的に基になる Delta テーブルの廃棄石として保持され、これらの廃棄石を除外するビューがメタストアに作成されます。 保持間隔の既定値は 2 日間で、 pipelines.cdc.tombstoneGCThresholdInSeconds テーブル プロパティを使用して構成できます。 |
apply_as_truncates |
str または expr() |
CDC イベントを完全なテーブル TRUNCATEとして扱うタイミングを指定します。 次のいずれかを指定できます。
- 文字列:
"Operation = 'TRUNCATE'" - Spark SQL
expr() 関数: expr("Operation = 'TRUNCATE'")
この句はターゲット テーブルの完全な切り捨てをトリガーするため、この機能を必要とする特定のユース ケースにのみ使用する必要があります。
apply_as_truncates パラメーターは、SCD タイプ 1 でのみサポートされます。 SCD タイプ 2 では、切り捨て操作はサポートされていません。 |
column_list または except_column_list |
list |
ターゲット テーブルに含める列のサブセット。
column_listを使用して、含める列の完全な一覧を指定します。
except_column_listを使用して、除外する列を指定します。 値は、文字列の一覧として宣言することも、Spark SQL col() 関数として宣言することもできます。
column_list = ["userId", "name", "city"]column_list = [col("userId"), col("name"), col("city")]except_column_list = ["operation", "sequenceNum"]except_column_list = [col("operation"), col("sequenceNum")
col()関数の引数に修飾子を含めることはできません。 たとえば、 col(userId)を使用することはできますが、 col(source.userId)は使用できません。 既定では、 column_list または except_column_list 引数が関数に渡されない場合に、ターゲット テーブル内のすべての列が含まれます。 |
stored_as_scd_type |
str または int |
レコードを SCD タイプ 1 または SCD タイプ 2 として格納するかどうか。 SCD タイプ 1 の場合は 1 、SCD タイプ 2 の場合は 2 に設定されます。 既定値は SCD タイプ 1 です。 |
track_history_column_list または track_history_except_column_list |
list |
ターゲット テーブル内の履歴の追跡対象となる出力列のサブセット。
track_history_column_listを使用して、追跡する列の完全な一覧を指定します。
track_history_except_column_listを使用して、追跡から除外する列を指定します。 値は、文字列の一覧として宣言することも、Spark SQL col() 関数として宣言することもできます。
track_history_column_list = ["userId", "name", "city"]track_history_column_list = [col("userId"), col("name"), col("city")]track_history_except_column_list = ["operation", "sequenceNum"]track_history_except_column_list = [col("operation"), col("sequenceNum")
col()関数の引数に修飾子を含めることはできません。 たとえば、 col(userId)を使用することはできますが、 col(source.userId)は使用できません。 既定では、 track_history_column_list または track_history_except_column_list 引数が関数に渡されない場合に、ターゲット テーブル内のすべての列が含まれます。 |
name |
str |
フロー名。 指定しない場合、既定値は target と同じ値になります。 |
once |
bool |
必要に応じて、バックフィルなどの 1 回限りのフローとしてフローを定義します。
once=Trueを使用すると、次の 2 つの方法でフローが変更されます。
- 戻り値。
streaming-query。 この場合は、ストリーミング DataFrame ではなく、バッチ DataFrame である必要があります。 - フローは既定で 1 回実行されます。 パイプラインが完全な更新で更新された場合、
ONCE フローが再度実行され、データが再作成されます。
|