다음을 통해 공유


흐름 생성 (Lakeflow 선언적 파이프라인)

CREATE FLOW 문장을 사용하여 Lakeflow 선언적 파이프라인 테이블에 대한 플로우 또는 백필 작업을 만드세요.

문법

CREATE FLOW flow_name [COMMENT comment] AS
{
  AUTO CDC INTO target_table create_auto_cdc_flow_spec |
  INSERT INTO [ONCE] target_table BY NAME query
}

매개 변수

  • flow_name

    생성할 흐름의 이름입니다.

  • 주석

    흐름에 대한 선택적 설명입니다.

  • 자동 CDC INTO

    AUTO CDC ... INTO 을 사용하여 흐름을 정의하는 문입니다create_auto_cdc_flow_spec. AUTO CDC ... INTO 문 또는 INSERT INTO 문을 포함해야 합니다. 원본 쿼리가 변경 데이터 의미 체계를 사용할 때 AUTO CDC ... INTO을/를 사용합니다.

    자세한 내용은 Lakeflow 선언적 파이프라인의 AUTO CDC INTO를 참조하세요.

  • 대상_테이블

    업데이트할 테이블입니다. 스트리밍 테이블이어야 합니다.

  • INSERT 안으로

    대상 테이블에 삽입되는 테이블 쿼리를 정의합니다. ONCE 옵션이 제공되지 않으면 쿼리는 스트리밍 쿼리여야 합니다. 소스에서 읽기 위해 스트리밍 의미론을 사용하려면 STREAM 키워드를 사용하세요. 기존 레코드에 대한 변경이나 삭제가 발생하면 읽기 작업 중 오류가 발생합니다. 가장 안전한 방법은 정적 또는 추가 전용 소스에서 읽는 것입니다. 변경 커밋이 포함된 데이터를 수집하려면, Python과 SkipChangeCommits 옵션을 사용하여 오류를 처리할 수 있습니다.

    INSERT INTOAUTO CDC ... INTO는 상호 배타적입니다. 원본 데이터에 CDC(변경 데이터 캡처) 기능이 포함된 경우에 사용합니다 AUTO CDC ... INTO . 원본에 INSERT INTO가 없는 경우 사용하십시오.

    스트리밍 데이터에 대한 자세한 내용은 파이프라인 사용하여 데이터 변환참조하세요.

  • 한때

    흐름을 백필과 같은 일회성 흐름으로 정의할 수 있습니다. 이는 필요에 따라 선택적으로 수행할 수 있습니다. ONCE 다음 두 가지 방법으로 흐름을 변경합니다.

    • 원본 query 이거나 create_auto_cdc_flow_spec 스트리밍 테이블이 아닙니다.
    • 흐름은 기본적으로 한 번 실행됩니다. 파이프라인이 전체 새로 고침으로 업데이트되면 흐름이 ONCE 다시 실행되어 데이터를 다시 만듭니다.

예시

-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;

-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users
SELECT * FROM stream(raw_data.users);

-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users
SELECT * FROM user_backfill_table;

-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;

-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;