이 함수는 create_auto_cdc_flow() Lakeflow Spark 선언적 파이프라인 CDC(변경 데이터 캡처) 기능을 사용하여 CDF(변경 데이터 피드)에서 원본 데이터를 처리하는 흐름을 만듭니다.
비고
이 함수는 이전 함수 apply_changes()를 대체합니다. 두 함수의 서명은 동일합니다. Databricks는 새 이름을 사용하도록 업데이트하는 것이 좋습니다.
중요합니다
변경 내용을 적용하려면 대상 스트리밍 테이블을 선언해야 합니다. 필요에 따라 대상 테이블에 대한 스키마를 지정할 수 있습니다. 대상 테이블의 스키마를 지정할 때, create_auto_cdc_flow() 열과 __START_AT 열을 __END_AT 필드와 동일한 데이터 형식으로 포함해야 합니다.
필요한 대상 테이블을 만들려면 파이프라인 Python 인터페이스에서 create_streaming_table() 함수를 사용할 수 있습니다.
Syntax
from pyspark import pipelines as dp
dp.create_auto_cdc_flow(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = <bool>,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None,
name = None,
once = <bool>
)
처리를 위해 create_auto_cdc_flow 이벤트에 대한 INSERTUPDATE 기본 동작은 원본에서 CDC 이벤트를 upsert 하는 것입니다. 지정된 키와 일치하는 대상 테이블의 행을 업데이트하거나 대상 테이블에 일치하는 레코드가 없을 때 새 행을 삽입합니다.
DELETE 이벤트 처리는 apply_as_deletes 매개 변수를 사용하여 지정할 수 있습니다.
변경 피드를 사용한 CDC 처리에 대한 자세한 내용은 AUTO CDC API: 파이프라인을 사용하여 변경 데이터 캡처 간소화를 참조하세요. 함수를 사용하는 create_auto_cdc_flow() 예제는 예제: CDF 원본 데이터를 사용한 SCD 형식 1 및 SCD 형식 2 처리를 참조하세요.
매개 변수
| 매개 변수 | 유형 | Description |
|---|---|---|
target |
str |
필수 사항입니다. 업데이트할 테이블의 이름입니다. 함수를 실행하기 전에 create_streaming_table() 함수를 사용하여 대상 테이블을 만들 수 있습니다 create_auto_cdc_flow() . |
source |
str |
필수 사항입니다. CDC 레코드를 포함하는 데이터 원본입니다. |
keys |
list |
필수 사항입니다. 원본 데이터의 행을 고유하게 식별하는 열 또는 열의 조합입니다. 이는 대상 테이블의 특정 레코드에 적용되는 CDC 이벤트를 식별하는 데 사용됩니다. 다음 중 하나를 지정할 수 있습니다.
|
sequence_by |
str, col() 또는 struct() |
필수 사항입니다. 원본 데이터에서 CDC 이벤트의 논리적 순서를 지정하는 열 이름입니다. Lakeflow Spark 선언적 파이프라인은 이 시퀀싱을 사용하여 순서가 잘못 도착하는 변경 이벤트를 처리합니다. 지정된 열은 정렬 가능한 데이터 형식이어야 합니다. 다음 중 하나를 지정할 수 있습니다.
|
ignore_null_updates |
bool |
대상 열의 하위 집합을 포함하는 업데이트를 수집할 수 있습니다. CDC 이벤트가 기존 행과 일치하고 ignore_null_updates가 True인 경우, null가 있는 열은 대상 내에서 기존 값을 유지합니다. 값이 null인 중첩 열에도 적용됩니다.
ignore_null_updates가 False일 때, null 값으로 기존 값을 덮어씁니다.기본값은 False입니다. |
apply_as_deletes |
str 또는 expr() |
CDC 이벤트를 DELETE로 처리해야 하는 시기를 upsert 대신 지정합니다. 다음 중 하나를 지정할 수 있습니다.
순서가 다른 데이터를 처리하기 위해 삭제된 행은 기본 델타 테이블에서 임시로 삭제 표시로 유지되고 메타스토어에서 이러한 삭제 표시를 필터링하는 뷰가 만들어집니다. 보존 간격은 기본적으로 2일로 설정되며 테이블 속성으로 pipelines.cdc.tombstoneGCThresholdInSeconds 구성할 수 있습니다. |
apply_as_truncates |
str 또는 expr() |
CDC 이벤트를 전체 테이블 TRUNCATE로 처리해야 하는 시기를 지정합니다. 다음 중 하나를 지정할 수 있습니다.
이 절은 대상 테이블의 전체 잘림을 트리거하므로 이 기능이 필요한 특정 사용 사례에만 사용해야 합니다. 매개 apply_as_truncates 변수는 SCD 형식 1에 대해서만 지원됩니다. SCD 유형 2는 자르기 작업을 지원하지 않습니다. |
column_list 또는 except_column_list |
list |
대상 테이블에 포함할 열의 하위 집합입니다. 포함할 열의 전체 목록을 지정하는 데 사용합니다 column_list . 제외할 열을 지정하는 데 사용합니다 except_column_list . 값을 문자열 목록 또는 Spark SQL col() 함수로 선언할 수 있습니다.
함수에 대한 col() 인수에는 한정자를 포함할 수 없습니다. 예를 들어 사용할 col(userId)수 있지만 사용할 col(source.userId)수는 없습니다. 기본값은 함수에 인수 column_list 또는 except_column_list이 전달되지 않을 때 대상 테이블의 모든 열을 포함하는 것입니다. |
stored_as_scd_type |
str 또는 int |
레코드를 SCD 형식 1 또는 SCD 형식 2로 저장할지 여부입니다.
1 SCD 유형 1 또는 2 SCD 유형 2로 설정합니다. 기본값은 SCD 형식 1입니다. |
track_history_column_list 또는 track_history_except_column_list |
list |
대상 테이블의 기록에 대해 추적할 출력 열의 하위 집합입니다. 추적할 열의 전체 목록을 지정하는 데 사용합니다 track_history_column_list . 추적에서 제외할 열을 지정하는 데 사용합니다 track_history_except_column_list . 값을 문자열 목록 또는 Spark SQL col() 함수로 선언할 수 있습니다.
함수에 대한 col() 인수에는 한정자를 포함할 수 없습니다. 예를 들어 사용할 col(userId)수 있지만 사용할 col(source.userId)수는 없습니다. 기본값은 함수에 인수 track_history_column_list 또는 track_history_except_column_list이 전달되지 않을 때 대상 테이블의 모든 열을 포함하는 것입니다. |
name |
str |
흐름 이름입니다. 제공되지 않은 경우 기본값은 .와 동일한 값으로 target설정됩니다. |
once |
bool |
필요에 따라 흐름을 백필과 같은 일회성 흐름으로 정의합니다.
once=True 다음 두 가지 방법으로 흐름을 변경합니다.
|