다음을 통해 공유


AUTO CDC INTO(파이프라인)

문을 AUTO CDC ... INTO 사용하여 Lakeflow Spark 선언적 파이프라인 CDC(변경 데이터 캡처) 기능을 사용하는 흐름을 만듭니다. 이 문은 CDC 원본에서 변경 내용을 읽고 스트리밍 대상에 적용합니다.

Syntax

CREATE OR REFRESH STREAMING TABLE table_name;

CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

다른 파이프라인 쿼리와 동일한 CONSTRAINT 절을 사용하여 대상에 대한 데이터 품질 제약 조건을 정의합니다. 파이프라인 기대를 사용하여 데이터 품질을 관리하기를 참조하세요.

이벤트에 대한 INSERTUPDATE 기본 동작은 원본에서 CDC 이벤트를 upsert 하는 것입니다. 대상 테이블의 지정된 키와 일치하는 행을 업데이트하거나 대상 테이블에 일치하는 레코드가 없는 경우 새 행을 삽입합니다. DELETE 조건을 사용하여 APPLY AS DELETE WHEN 이벤트 처리를 지정할 수 있습니다.

중요합니다

변경 내용을 적용하려면 대상 스트리밍 테이블을 선언해야 합니다. 필요에 따라 대상 테이블에 대한 스키마를 지정할 수 있습니다. SCD 형식 2 테이블의 경우, 대상 테이블의 스키마를 지정할 때 __START_AT 필드와 동일한 데이터 형식으로 __END_ATsequence_by 열을 포함해야 합니다.

AUTO CDC API: 파이프라인을 사용하여 변경 데이터 캡처 간소화를 참조하세요.

매개 변수

  • flow_name

    생성할 흐름의 이름입니다.

  • source

    데이터의 원본입니다. 원본은 스트리밍 원본이어야 합니다. 소스에서 읽기 위해 스트리밍 의미론을 사용하려면 STREAM 키워드를 사용하세요. 기존 레코드에 대한 변경이나 삭제가 발생하면 읽기 작업 중 오류가 발생합니다. 가장 안전한 방법은 정적 또는 추가 전용 소스에서 읽는 것입니다. 변경 커밋이 포함된 데이터를 수집하려면, Python과 SkipChangeCommits 옵션을 사용하여 오류를 처리할 수 있습니다.

    스트리밍 데이터에 대한 자세한 내용은 파이프라인 사용하여 데이터 변환참조하세요.

  • KEYS

    원본 데이터의 행을 고유하게 식별하는 열 또는 열의 조합입니다. 이러한 열의 값은 대상 테이블의 특정 레코드에 적용되는 CDC 이벤트를 식별하는 데 사용됩니다.

    열 조합을 정의하려면 쉼표로 구분된 열 목록을 사용합니다.

    이 절 ia가 필요합니다.

  • IGNORE NULL UPDATES

    대상 열의 하위 집합을 포함하는 업데이트를 수집할 수 있습니다. CDC 이벤트가 기존 행과 일치하고 IGNORE NULL UPDATES가 지정되면 값이 있는 null 열은 대상에 기존 값을 유지합니다. 값이 null인 중첩 열에도 적용됩니다.

    이 절은 선택 사항입니다.

    기본 설정은 null 값으로 기존 열을 덮어쓰는 것입니다.

  • APPLY AS DELETE WHEN

    CDC 이벤트를 DELETE로 처리해야 하는 시기를 upsert 대신 지정합니다.

    SCD 형식 2 원본의 경우 순서가 뒤바뀐 데이터를 처리하기 위해 삭제된 행은 기본 델타 테이블의 묘비 표시로 일시적으로 유지되며 이러한 묘비 표시를 필터링하는 뷰가 메타 데이터 저장소에 생성됩니다. 보존 간격은 pipelines.cdc.tombstoneGCThresholdInSeconds 사용하여 구성할 수 있습니다.

    이 절은 선택 사항입니다.

  • APPLY AS TRUNCATE WHEN

    CDC 이벤트를 전체 테이블 TRUNCATE로 처리해야 하는 시기를 지정합니다. 이 절은 대상 테이블의 전체 잘림을 트리거하므로 이 기능이 필요한 특정 사용 사례에만 사용해야 합니다.

    APPLY AS TRUNCATE WHEN 절은 SCD 유형 1에 대해서만 지원됩니다. SCD 유형 2는 자르기 작업을 지원하지 않습니다.

    이 절은 선택 사항입니다.

  • SEQUENCE BY

    원본 데이터에서 CDC 이벤트의 논리적 순서를 지정하는 열 이름입니다. 파이프라인 처리는 이 시퀀싱을 사용하여 순서가 잘못 도착하는 변경 이벤트를 처리합니다.

    시퀀싱에 여러 열이 필요한 경우 식을 사용합니다 STRUCT . 첫 번째 구조체 필드를 기준으로 순서를 지정한 다음, 동률인 경우 두 번째 필드를 기준으로 정렬합니다.

    지정된 열은 정렬 가능한 데이터 형식이어야 합니다.

    이 조항은 필수적입니다.

  • COLUMNS

    대상 테이블에 포함할 열의 하위 집합을 지정합니다. 다음 중 하나를 수행할 수 있습니다.

    • 포함할 열의 전체 목록 COLUMNS (userId, name, city)을 지정합니다.
    • 제외할 열 목록을 지정합니다. COLUMNS * EXCEPT (operation, sequenceNum)

    이 절은 선택 사항입니다.

    기본값은 절이 지정되지 않은 경우 COLUMNS 대상 테이블에 모든 열을 포함하는 것입니다.

  • STORED AS

    레코드를 SCD 형식 1 또는 SCD 형식 2로 저장할지 여부입니다.

    이 절은 선택 사항입니다.

    기본값은 SCD 형식 1입니다.

  • TRACK HISTORY ON

    지정된 열이 변경될 때 기록 레코드를 생성할 출력 열의 하위 집합을 지정합니다. 다음 중 하나를 수행할 수 있습니다.

    • COLUMNS (userId, name, city)를 추적할 열의 전체 목록을 지정합니다.
    • 추적에서 제외할 열 목록을 지정합니다. COLUMNS * EXCEPT (operation, sequenceNum)

    이 절은 선택 사항입니다. 기본값은 변경 내용이 있을 때 모든 출력 열에 대한 기록을 추적하는 것입니다 TRACK HISTORY ON *.

예시

-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW flow
AS AUTO CDC INTO
  target
FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);