다음을 통해 공유


CREATE FLOW(파이프라인)

문을 사용해 파이프라인의 CREATE FLOW 테이블에 대한 흐름을 생성하거나 백필을 만듭니다.

Syntax

CREATE FLOW flow_name [COMMENT comment] AS
{
  AUTO CDC INTO target_table create_auto_cdc_flow_spec |
  INSERT INTO [ONCE] target_table BY NAME query
}

매개 변수

  • flow_name

    생성할 흐름의 이름입니다.

  • 주석

    흐름에 대한 선택적 설명입니다.

  • 자동 CDC INTO

    AUTO CDC ... INTO 을 사용하여 흐름을 정의하는 문입니다create_auto_cdc_flow_spec. AUTO CDC ... INTO 문 또는 INSERT INTO 문 중 하나를 포함해야 합니다. 데이터 변경 의미 체계를 소스 쿼리에서 사용할 때 AUTO CDC ... INTO를 사용합니다.

    자세한 내용은 AUTO CDC INTO(파이프라인)를 참조하세요.

  • target_table

    업데이트할 테이블입니다. 스트리밍 테이블이어야 합니다.

  • INSERT 로

    대상 테이블에 삽입되는 테이블 쿼리를 정의합니다. ONCE 옵션이 제공되지 않으면 쿼리는 스트리밍 쿼리여야 합니다. 소스에서 읽기 위해 스트리밍 의미론을 사용하려면 STREAM 키워드를 사용하세요. 기존 레코드에 대한 변경이나 삭제가 발생하면 읽기 작업 중 오류가 발생합니다. 가장 안전한 방법은 정적 또는 추가 전용 소스에서 읽는 것입니다. 변경 커밋이 포함된 데이터를 수집하려면, Python과 SkipChangeCommits 옵션을 사용하여 오류를 처리할 수 있습니다.

    INSERT INTOAUTO CDC ... INTO와 서로 배타적입니다. 원본 데이터에 CDC(변경 데이터 캡처) 기능이 포함된 경우에 사용합니다 AUTO CDC ... INTO . 원본이 사용하지 않는 경우 사용합니다 INSERT INTO .

    스트리밍 데이터에 대한 자세한 내용은 파이프라인 사용하여 데이터 변환참조하세요.

  • 한번

    필요에 따라 흐름을 백필과 같은 일회성 흐름으로 정의합니다. ONCE 다음 두 가지 방법으로 흐름을 변경합니다.

    • 원본 query 이거나 create_auto_cdc_flow_spec 스트리밍 테이블이 아닙니다.
    • 흐름은 기본적으로 한 번 실행됩니다. 파이프라인이 전체 새로 고침으로 업데이트되면 흐름이 ONCE 다시 실행되어 데이터를 다시 만듭니다.

예시

-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;

-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users
SELECT * FROM stream(raw_data.users);

-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users
SELECT * FROM user_backfill_table;

-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;

-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;