문을 사용해 파이프라인의 CREATE FLOW 테이블에 대한 흐름을 생성하거나 백필을 만듭니다.
Syntax
CREATE FLOW flow_name [COMMENT comment] AS
{
AUTO CDC INTO target_table create_auto_cdc_flow_spec |
INSERT INTO [ONCE] target_table BY NAME query
}
매개 변수
flow_name
생성할 흐름의 이름입니다.
주석
흐름에 대한 선택적 설명입니다.
-
AUTO CDC ... INTO을 사용하여 흐름을 정의하는 문입니다create_auto_cdc_flow_spec.AUTO CDC ... INTO문 또는INSERT INTO문 중 하나를 포함해야 합니다. 데이터 변경 의미 체계를 소스 쿼리에서 사용할 때AUTO CDC ... INTO를 사용합니다.자세한 내용은 AUTO CDC INTO(파이프라인)를 참조하세요.
target_table
업데이트할 테이블입니다. 스트리밍 테이블이어야 합니다.
INSERT 로
대상 테이블에 삽입되는 테이블 쿼리를 정의합니다.
ONCE옵션이 제공되지 않으면 쿼리는 스트리밍 쿼리여야 합니다. 소스에서 읽기 위해 스트리밍 의미론을 사용하려면 STREAM 키워드를 사용하세요. 기존 레코드에 대한 변경이나 삭제가 발생하면 읽기 작업 중 오류가 발생합니다. 가장 안전한 방법은 정적 또는 추가 전용 소스에서 읽는 것입니다. 변경 커밋이 포함된 데이터를 수집하려면, Python과SkipChangeCommits옵션을 사용하여 오류를 처리할 수 있습니다.INSERT INTO는AUTO CDC ... INTO와 서로 배타적입니다. 원본 데이터에 CDC(변경 데이터 캡처) 기능이 포함된 경우에 사용합니다AUTO CDC ... INTO. 원본이 사용하지 않는 경우 사용합니다INSERT INTO.스트리밍 데이터에 대한 자세한 내용은 파이프라인 사용하여 데이터 변환참조하세요.
한번
필요에 따라 흐름을 백필과 같은 일회성 흐름으로 정의합니다.
ONCE다음 두 가지 방법으로 흐름을 변경합니다.- 원본
query이거나create_auto_cdc_flow_spec스트리밍 테이블이 아닙니다. - 흐름은 기본적으로 한 번 실행됩니다. 파이프라인이 전체 새로 고침으로 업데이트되면 흐름이
ONCE다시 실행되어 데이터를 다시 만듭니다.
- 원본
예시
-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;
-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users
SELECT * FROM stream(raw_data.users);
-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users
SELECT * FROM user_backfill_table;
-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;
-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;