다음을 통해 공유


create_auto_cdc_flow

이 함수는 create_auto_cdc_flow() Lakeflow Spark 선언적 파이프라인 CDC(변경 데이터 캡처) 기능을 사용하여 CDF(변경 데이터 피드)에서 원본 데이터를 처리하는 흐름을 만듭니다.

비고

이 함수는 이전 함수 apply_changes()를 대체합니다. 두 함수의 서명은 동일합니다. Databricks는 새 이름을 사용하도록 업데이트하는 것이 좋습니다.

중요합니다

변경 내용을 적용하려면 대상 스트리밍 테이블을 선언해야 합니다. 필요에 따라 대상 테이블에 대한 스키마를 지정할 수 있습니다. 대상 테이블의 스키마를 지정할 때, create_auto_cdc_flow() 열과 __START_AT 열을 __END_AT 필드와 동일한 데이터 형식으로 포함해야 합니다.

필요한 대상 테이블을 만들려면 파이프라인 Python 인터페이스에서 create_streaming_table() 함수를 사용할 수 있습니다.

Syntax

from pyspark import pipelines as dp

dp.create_auto_cdc_flow(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = <bool>,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None,
  name = None,
  once = <bool>
)

처리를 위해 create_auto_cdc_flow 이벤트에 대한 INSERTUPDATE 기본 동작은 원본에서 CDC 이벤트를 upsert 하는 것입니다. 지정된 키와 일치하는 대상 테이블의 행을 업데이트하거나 대상 테이블에 일치하는 레코드가 없을 때 새 행을 삽입합니다. DELETE 이벤트 처리는 apply_as_deletes 매개 변수를 사용하여 지정할 수 있습니다.

변경 피드를 사용한 CDC 처리에 대한 자세한 내용은 AUTO CDC API: 파이프라인을 사용하여 변경 데이터 캡처 간소화를 참조하세요. 함수를 사용하는 create_auto_cdc_flow() 예제는 예제: CDF 원본 데이터를 사용한 SCD 형식 1 및 SCD 형식 2 처리를 참조하세요.

매개 변수

매개 변수 유형 Description
target str 필수 사항입니다. 업데이트할 테이블의 이름입니다. 함수를 실행하기 전에 create_streaming_table() 함수를 사용하여 대상 테이블을 만들 수 있습니다 create_auto_cdc_flow() .
source str 필수 사항입니다. CDC 레코드를 포함하는 데이터 원본입니다.
keys list 필수 사항입니다. 원본 데이터의 행을 고유하게 식별하는 열 또는 열의 조합입니다. 이는 대상 테이블의 특정 레코드에 적용되는 CDC 이벤트를 식별하는 데 사용됩니다. 다음 중 하나를 지정할 수 있습니다.
  • 문자열 목록: ["userId", "orderId"]
  • Spark SQL col() 함수 목록: [col("userId"), col("orderId")]. 함수에 대한 col() 인수에는 한정자를 포함할 수 없습니다. 예를 들어 사용할 col(userId)수 있지만 사용할 col(source.userId)수는 없습니다.
sequence_by str, col() 또는 struct() 필수 사항입니다. 원본 데이터에서 CDC 이벤트의 논리적 순서를 지정하는 열 이름입니다. Lakeflow Spark 선언적 파이프라인은 이 시퀀싱을 사용하여 순서가 잘못 도착하는 변경 이벤트를 처리합니다. 지정된 열은 정렬 가능한 데이터 형식이어야 합니다. 다음 중 하나를 지정할 수 있습니다.
  • 문자열: "sequenceNum"
  • Spark SQL col() 함수: col("sequenceNum"). 함수에 대한 col() 인수에는 한정자를 포함할 수 없습니다. 예를 들어 사용할 col(userId)수 있지만 사용할 col(source.userId)수는 없습니다.
  • struct() 여러 열을 결합하여 관계를 struct("timestamp_col", "id_col")끊습니다. 첫 번째 구조체 필드를 기준으로 정렬한 다음, 동률인 경우 두 번째 필드를 기준으로 정렬합니다.
ignore_null_updates bool 대상 열의 하위 집합을 포함하는 업데이트를 수집할 수 있습니다. CDC 이벤트가 기존 행과 일치하고 ignore_null_updatesTrue인 경우, null가 있는 열은 대상 내에서 기존 값을 유지합니다. 값이 null인 중첩 열에도 적용됩니다. ignore_null_updatesFalse일 때, null 값으로 기존 값을 덮어씁니다.
기본값은 False입니다.
apply_as_deletes str 또는 expr() CDC 이벤트를 DELETE로 처리해야 하는 시기를 upsert 대신 지정합니다. 다음 중 하나를 지정할 수 있습니다.
  • 문자열: "Operation = 'DELETE'"
  • Spark SQL expr() 함수: expr("Operation = 'DELETE'")

순서가 다른 데이터를 처리하기 위해 삭제된 행은 기본 델타 테이블에서 임시로 삭제 표시로 유지되고 메타스토어에서 이러한 삭제 표시를 필터링하는 뷰가 만들어집니다. 보존 간격은 기본적으로 2일로 설정되며 테이블 속성으로 pipelines.cdc.tombstoneGCThresholdInSeconds 구성할 수 있습니다.
apply_as_truncates str 또는 expr() CDC 이벤트를 전체 테이블 TRUNCATE로 처리해야 하는 시기를 지정합니다. 다음 중 하나를 지정할 수 있습니다.
  • 문자열: "Operation = 'TRUNCATE'"
  • Spark SQL expr() 함수: expr("Operation = 'TRUNCATE'")

이 절은 대상 테이블의 전체 잘림을 트리거하므로 이 기능이 필요한 특정 사용 사례에만 사용해야 합니다. 매개 apply_as_truncates 변수는 SCD 형식 1에 대해서만 지원됩니다. SCD 유형 2는 자르기 작업을 지원하지 않습니다.
column_list 또는 except_column_list list 대상 테이블에 포함할 열의 하위 집합입니다. 포함할 열의 전체 목록을 지정하는 데 사용합니다 column_list . 제외할 열을 지정하는 데 사용합니다 except_column_list . 값을 문자열 목록 또는 Spark SQL col() 함수로 선언할 수 있습니다.
  • column_list = ["userId", "name", "city"]
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")

함수에 대한 col() 인수에는 한정자를 포함할 수 없습니다. 예를 들어 사용할 col(userId)수 있지만 사용할 col(source.userId)수는 없습니다. 기본값은 함수에 인수 column_list 또는 except_column_list이 전달되지 않을 때 대상 테이블의 모든 열을 포함하는 것입니다.
stored_as_scd_type str 또는 int 레코드를 SCD 형식 1 또는 SCD 형식 2로 저장할지 여부입니다. 1 SCD 유형 1 또는 2 SCD 유형 2로 설정합니다. 기본값은 SCD 형식 1입니다.
track_history_column_list 또는 track_history_except_column_list list 대상 테이블의 기록에 대해 추적할 출력 열의 하위 집합입니다. 추적할 열의 전체 목록을 지정하는 데 사용합니다 track_history_column_list . 추적에서 제외할 열을 지정하는 데 사용합니다 track_history_except_column_list . 값을 문자열 목록 또는 Spark SQL col() 함수로 선언할 수 있습니다.
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

함수에 대한 col() 인수에는 한정자를 포함할 수 없습니다. 예를 들어 사용할 col(userId)수 있지만 사용할 col(source.userId)수는 없습니다. 기본값은 함수에 인수 track_history_column_list 또는 track_history_except_column_list이 전달되지 않을 때 대상 테이블의 모든 열을 포함하는 것입니다.
name str 흐름 이름입니다. 제공되지 않은 경우 기본값은 .와 동일한 값으로 target설정됩니다.
once bool 필요에 따라 흐름을 백필과 같은 일회성 흐름으로 정의합니다. once=True 다음 두 가지 방법으로 흐름을 변경합니다.
  • 반환 값입니다. streaming-query; 이 경우는 스트리밍 데이터 프레임이 아닌 배치 DataFrame이어야 합니다.
  • 흐름은 기본적으로 한 번 실행됩니다. 파이프라인이 전체 새로 고침으로 업데이트되면 흐름이 ONCE 다시 실행되어 데이터를 다시 만듭니다.